::CORBA::Any value;
+ ::CORBA::String_member str_value;
+
void operator>>= (cdrStream &) const;
struct DataItem {
string name;
any value;
+ string str_value;
};
typedef sequence<DataItem> DataItemSequence;
static CORBA::PR_structMember _0RL_structmember_plpbus__orb_mDataItem[] = {
{"name", CORBA::TypeCode::PR_string_tc(0, &_0RL_tcTrack)},
- {"value", CORBA::TypeCode::PR_any_tc()}
+ {"value", CORBA::TypeCode::PR_any_tc()},
+ {"str_value", CORBA::TypeCode::PR_string_tc(0, &_0RL_tcTrack)}
};
#ifdef _0RL_tc_plpbus__orb_mDataItem
# undef _0RL_tc_plpbus__orb_mDataItem
#endif
-static CORBA::TypeCode_ptr _0RL_tc_plpbus__orb_mDataItem = CORBA::TypeCode::PR_struct_tc("IDL:plpbus_orb/DataItem:1.0", "DataItem", _0RL_structmember_plpbus__orb_mDataItem, 2, &_0RL_tcTrack);
+static CORBA::TypeCode_ptr _0RL_tc_plpbus__orb_mDataItem = CORBA::TypeCode::PR_struct_tc("IDL:plpbus_orb/DataItem:1.0", "DataItem", _0RL_structmember_plpbus__orb_mDataItem, 3, &_0RL_tcTrack);
#if defined(HAS_Cplusplus_Namespace) && defined(_MSC_VER)
// MSVC++ does not give the constant external linkage otherwise.
{
_n.marshalString(name,0);
(const ::CORBA::Any&) value >>= _n;
+ _n.marshalString(str_value,0);
}
{
name = _n.unmarshalString(0);
(::CORBA::Any&)value <<= _n;
+ str_value = _n.unmarshalString(0);
}
return ret_val;
}
-int BusClient::send_message_and_request_response(BusMessage *msg_req_param) {
+int BusClient::send_message_and_request_response(const BusMessage *msg_req_param) {
DataItemSequence *seq;
seq = (DataItemSequence *)msg_req_param->_dataItemSeq;
return 0;
}
-int BusClient::send_message_and_wait_response(BusMessage *msg_req_param, BusMessage **msg_rsp_param) {
+int BusClient::send_message_and_wait_response(const BusMessage *msg_req_param, BusMessage **msg_rsp_param) {
DataItemSequence *seq_rsp;
DataItemSequence *seq;
virtual int add_client_listener(IClientListener *listener_param);
virtual int send_message_and_request_response(const char *msg_req_param);
virtual int send_message_and_wait_response(const char *msg_req_param, char **msg_rsp_param);
- virtual int send_message_and_request_response(BusMessage *msg_req_param);
- virtual int send_message_and_wait_response(BusMessage *msg_req_param, BusMessage **msg_rsp_param);
+ virtual int send_message_and_request_response(const BusMessage *msg_req_param);
+ virtual int send_message_and_wait_response(const BusMessage *msg_req_param, BusMessage **msg_rsp_param);
virtual void request_shutdown();
private:
PortableServer::POA_var _poa;
}
BusMessage::~BusMessage() {
+ long cnt;
+ DataItemSequence *seq;
+
+ log_debug("BusMessage DESTRUCTOR\n");
+ seq = (DataItemSequence *)_dataItemSeq;
+ if (seq != NULL) {
+ cnt = seq->length();
+/*
+ DataItem *item;
+
+ for (long ii = 0; ii < cnt; ii++) {
+ item = seq[ii];
+ log_debug("deleting BusMessage item:\n");
+ printout_dataitem(ii, item);
+ delete(item);
+ }
+*/
+ delete(seq);
+ }
+}
+
+int BusMessage::add_cstring_parameter(const char *arg_name_param, const char *value_param) {
+ long ln;
+ int ret_val;
+ DataItemSequence *seq;
+ DataItem *item;
+
+ ret_val = 0;
+ seq = (DataItemSequence *)_dataItemSeq;
+ if (arg_name_param != NULL) {
+ ln = seq->length();
+ seq->length(ln + 1);
+ item = &((*seq)[ln]);
+ item->name = arg_name_param;
+ item->str_value = value_param;
+ }
+ else {
+ ret_val = -1;
+ }
+ return ret_val;
}
int BusMessage::add_string_parameter(string arg_name_param, string value_param)
{
- long length;
+ long ln;
int ret_val;
- DataItem *item;
DataItemSequence *seq;
+ DataItem *item;
+ //str = (const char)"hooo";
ret_val = 0;
seq = (DataItemSequence *)_dataItemSeq;
if (arg_name_param.empty() == false) {
- length = seq->length();
- item = new DataItem();
- item->name = strdup(arg_name_param.c_str());
- item->value <<= value_param.c_str();
- seq->length(length + 1);
- (*seq)[length] = *item;
+ ln = seq->length();
+ seq->length(ln + 1);
+ item = &((*seq)[ln]);
+ item->name = arg_name_param.c_str();
+ item->str_value = value_param.c_str();
}
else {
ret_val = -1;
{
long length;
int ret_val;
- DataItem *item;
DataItemSequence *seq;
ret_val = 0;
seq = (DataItemSequence *)_dataItemSeq;
if (arg_name_param.empty() == false) {
length = seq->length();
- item = new DataItem();
- item->name = strdup(arg_name_param.c_str());
- item->value <<= value_param;
-
seq->length(length + 1);
- (*seq)[length] = *item;
+ (*seq)[length].name = arg_name_param.c_str();
+ (*seq)[length].value <<= value_param;
+ (*seq)[length].str_value = "";
}
else {
ret_val = -1;
{
long length;
int ret_val;
- DataItem *item;
DataItemSequence *seq;
ret_val = 0;
seq = (DataItemSequence *)_dataItemSeq;
if (arg_name_param.empty() == false) {
length = seq->length();
- item = new DataItem();
- item->name = strdup(arg_name_param.c_str());
- item->value <<= (long)value_param;
-
seq->length(length + 1);
- (*seq)[length] = *item;
+ (*seq)[length].name = arg_name_param.c_str();
+ (*seq)[length].value <<= (long)value_param;
+ (*seq)[length].str_value = "";
}
else {
ret_val = -1;
{
long length;
int ret_val;
- DataItem *item;
DataItemSequence *seq;
ret_val = 0;
seq = (DataItemSequence *)_dataItemSeq;
if (arg_name_param.empty() == false) {
length = seq->length();
- item = new DataItem();
- item->name = strdup(arg_name_param.c_str());
- item->value <<= (double)value_param;
-
seq->length(length + 1);
- (*seq)[length] = *item;
+ (*seq)[length].name = arg_name_param.c_str();
+ (*seq)[length].value <<= (double)value_param;
+ (*seq)[length].str_value = "";
}
else {
ret_val = -1;
{
long length;
int ret_val;
- DataItem *item;
DataItemSequence *seq;
ret_val = 0;
seq = (DataItemSequence *)_dataItemSeq;
if (arg_name_param.empty() == false) {
length = seq->length();
- item = new DataItem();
- item->name = strdup(arg_name_param.c_str());
- item->value <<= (float)value_param;
-
seq->length(length + 1);
- (*seq)[length] = *item;
+ (*seq)[length].name = arg_name_param.c_str();
+ (*seq)[length].value <<= (float)value_param;
+ (*seq)[length].str_value = "";
}
else {
ret_val = -1;
seq = (DataItemSequence *)_dataItemSeq;
item = get_dataitem_by_param_name(seq, arg_name_param);
if (item != NULL) {
- item->value >>= CORBA::Any::to_string(ch, 0);
+ //item->value >>= CORBA::Any::to_string(ch, 0);
+ ch = item->str_value;
ret_val = ch;
*err_flg = PLP_OK;
}
name = dataitem->name;
kind = dataitem->value.type()->kind();
switch(kind) {
- case tk_string:
- {
- const char *val;
-
- dataitem->value >>= val;
- log_debug("\t[%ld] %s: %s\n", index, name, val);
- }
- break;
case tk_long:
{
long val;
}
break;
default:
+ {
+ const char *val;
+ val = dataitem->str_value;
+ log_debug("\t[%ld] %s: %s\n", index, name, val);
+ }
+ break;
log_error("\t[%ld], name: %s, value unknown\n", index, name);
}
}
count = seq->length();
log_debug("BusMessage.printout(): parameter count: %ld\n", count);
for (long ii = 0; ii < count; ii++) {
+ printf("item[%ld]\n", ii);
item = (*seq)[ii];
printout_dataitem(ii, &item);
}
BusMessage(long type_param);
BusMessage(long type_param, const char *arg_name_param, const char *arg_string_value_param);
virtual ~BusMessage();
- int add_string_parameter(std::string arg_name_param, std::string string_value_param);
+ int add_cstring_parameter(const char *arg_name_param, const char *value_param);
+ int add_string_parameter(std::string arg_name_param, std::string value_param);
int add_long_parameter(std::string arg_name_param, long value_param);
int add_int_parameter(std::string arg_name_param, int value_param);
int add_double_parameter(std::string arg_name_param, double value_param);
* Created on: Aug 18, 2010
* Author: lamikr
*/
-
+#include <plp/log.h>
#include "BusMessageInternal.hh"
using namespace plpbus_orb;
+using namespace plpbus;
-namespace plpbus {
- BusMessageInternal::BusMessageInternal(DataItemSequence seq_param) : BusMessage() {
- DataItemSequence *seq;
+BusMessageInternal::BusMessageInternal(DataItemSequence seq_param) : BusMessage() {
+ DataItemSequence *seq;
- seq = (DataItemSequence *)_dataItemSeq;
+ seq = (DataItemSequence *)_dataItemSeq;
+ if (seq != NULL) {
delete(seq);
- _dataItemSeq = new DataItemSequence(seq_param);
}
+ _dataItemSeq = new DataItemSequence(seq_param);
+}
- BusMessageInternal::~BusMessageInternal() {
- }
+BusMessageInternal::~BusMessageInternal() {
+ log_debug("BusMessageInternal() DESTRUCTOR\n");
}
namespace plpbus {
class IServerListener {
public:
- virtual int request_received(const char *msg_in, char **msg_out) = 0;
- virtual int request_received(const BusMessage *msg_req, BusMessage **msg_rsp) = 0;
+ virtual int request_received(const char *msg_in, const char **msg_out) = 0;
+ virtual int request_received(const BusMessage *msg_req, const BusMessage *msg_rsp) = 0;
protected:
IServerListener() {};
virtual ~IServerListener() {};
void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
const char* msg_req)
{
- char *msg_rsp;
+ const char *msg_rsp;
msg_rsp = NULL;
if (CORBA::is_nil(response_listener_param) == false) {
}
char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
- char *msg_rsp;
- char *ret_val;
+ const char *msg_rsp;
+ char *ret_val;
err_flg = 0;
msg_rsp = NULL;
BusMessage *msg_req;
BusMessage *msg_rsp;
DataItemSequence *seq;
+ int err_flg;
if (CORBA::is_nil(response_listener_param) == false) {
if (_listener != NULL) {
log_debug("msg_req:\n");
msg_req->printout();
msg_rsp = NULL;
- _listener->request_received(msg_req, &msg_rsp);
+ err_flg = 0;
+ msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
+ _listener->request_received(msg_req, msg_rsp);
seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
log_debug("msg_rsp length: %ld\n", seq->length());
msg_rsp->printout();
response_listener_param->receive_response_dataitem_sequence(*seq);
+ delete(msg_req);
+ delete(msg_rsp);
}
else {
log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
BusMessage *msg_req;
BusMessage *msg_rsp;
DataItemSequence *seq;
+ int err_flg;
seq = NULL;
msg_rsp = NULL;
msg_req = new BusMessageInternal(req_seq_param);
- _listener->request_received(msg_req, &msg_rsp);
+ msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
+ _listener->request_received(msg_req, msg_rsp);
if (msg_rsp != NULL) {
seq = (DataItemSequence *)msg_rsp->_dataItemSeq;
}
srvr_thread->start();
}
else {
- cerr << "Failed to add event listener, listener NULL.\n";
+ log_error("Failed to add event listener, listener NULL.\n");
}
}
{
omni_mutex_lock sync(server_thread_mutex);
if (is_shutdown_pending() == 0) {
- cout << "shutdown request received!" << endl;
+ log_debug("shutdown request received!\n");
// Tell the servers to exit, and wait for them to do so.
_shutdown_pending = 1;
while(_server_thread_count > 0) {
- server_thread_shutdown_signal.wait();
+ server_thread_shutdown_signal.timedwait(5, 0);
+ log_debug("signal received\n");
}
// Shutdown the ORB (but do not wait for completion). This also
// causes the main thread to unblock from CORBA::ORB::run().
+ log_debug("calling orb shutdown\n");
_orb->shutdown(0);
+ log_debug("orb shutdown called\n");
}
}
retVal = 0;
}
else {
- cout << "init() failed" << endl;
+ log_error("init() failed\n");
}
return retVal;
}
server_name,
CONST_CONTEXT_KIND__PLPBUS);
if (ok_flg == true) {
- cout << "Registered to naming service: " << server_name << endl;
+ log_debug("Registered to naming service: %s\n", server_name);
_orb->run();
ret_val = 0;
}
else {
- cout << "Failed to register to naming service: " << server_name << endl;
+ log_error("Failed to register to naming service: %s\n", server_name);
}
}
return ret_val;
}
}
else {
- cerr << "Failed to create RootPOA." << endl;
+ log_error("Failed to create RootPOA.\n");
}
return ret_val;
}
* service existed already for the naming context with similar description.
* Replace the existing one with a new one.
*/
- cout << "service " << service_name_param << " existed, replacing it." << endl;
+ log_warning("service %s exist, replacing it\n", service_name_param);
service_context_param->rebind(context_data, service_ref_param);
retVal = true;
}
}
catch (CosNaming::NamingContext::InvalidName&) {
- cerr << "Could not register service to name server, invalid service name." << endl;
+ log_error("Could not register service to name server, invalid service name.\n");
}
catch (CosNaming::NamingContext::NotFound&) {
- cerr << "Could not register service to name server, service object reference is invalid." << endl;
+ log_error("Could not register service to name server, service object reference is invalid.\n");
}
catch (CosNaming::NamingContext::CannotProceed&) {
// This should not happen!
- cerr << "Could not register service to name server, unknown error." << endl;
+ log_error("Could not register service to name server, unknown error.\n");
}
catch(CORBA::SystemException& ex) {
- cerr << "Could not register service to name server, unknown error." << endl;
+ log_error("Could not register service to name server, unknown error.\n");
}
return retVal;
}
int retVal;
retVal = 0;
- cout << "register_request_received_callback() started" << endl;
+ log_debug("register_request_received_callback() started\n");
_listener = listener_param;
- cout << "register_callback() done" << endl;
return retVal;
}
}
void OrbServerImpl::server_thread_closed() {
- bool send_signal = false;
+ bool send_signal;
+ log_debug("server_thread_closed() started\n");
+ send_signal = false;
server_thread_mutex.lock();
+ log_debug("server_thread_closed(), server_thread_count: %d\n", _server_thread_count);
_server_thread_count--;
if (_server_thread_count == 0) {
send_signal = true;
}
server_thread_mutex.unlock();
if (send_signal == true) {
+ log_debug("server_thread_closed(), sending signal\n");
server_thread_shutdown_signal.signal();
+ log_debug("server_thread_closed(), signal send\n");
}
}
ServerEventThread(plpbus_orb::OrbClient_ptr client,
const char *msg,
int period,
- plpbus::OrbServerImpl *server_obj);
+ OrbServerImpl *server_obj);
virtual void run(void* arg);
private:
plpbus_orb::OrbClient_var _orb_client;
CORBA::String_var _orb_msg;
int _interval;
- plpbus::OrbServerImpl *_server_obj;
+ OrbServerImpl *_server_obj;
};
}
noinst_PROGRAMS = test_bus_admin \
test_bus_client \
- test_bus_server
+ test_bus_server \
+ test_bus_msg
test_bus_admin_SOURCES = test_admin.cc
test_bus_admin_LDADD = $(SRC_LIBS) ../src/.libs/libplpbus.a
test_bus_server_SOURCES = test_server.cc
test_bus_server_LDADD = $(SRC_LIBS) ../src/.libs/libplpbus.a
+test_bus_msg_SOURCES = test_bus_msg.cc
+test_bus_msg_LDADD = $(SRC_LIBS) ../src/.libs/libplpbus.a
+
AM_CPPFLAGS = $(SRC_CFLAGS) -I../src -I../src/idl
DISTCLEANFILES = Makefile.in
\ No newline at end of file
--- /dev/null
+/*
+ * test_bus_msg.c
+ *
+ * Created on: Mar 11, 2011
+ * Author: lamikr
+ */
+#include <string.h>
+#include <plp/log.h>
+
+#include "plpbus/BusMessage.hh"
+
+using namespace std;
+using namespace plpbus;
+
+int main(int argc, char** argv)
+{
+ BusMessage *bus_msg;
+ string str1;
+ string str2;
+ const char *c1;
+ const char *c2;
+ char *c3;
+ char *c4;
+ int err_flg;
+
+ str1 = "str";
+ str2 = "jou";
+ //c1 = (const char *)"c1";
+ //c2 = (const char *)"c2";
+ c1 = strdup("c1");
+ c2 = strdup("c2");
+ c3 = strdup("c3");
+ c4 = strdup("c4");
+ bus_msg = new BusMessage(12);
+ if (bus_msg != NULL) {
+ bus_msg->add_int_parameter("int", 1);
+ bus_msg->add_long_parameter("long", 1);
+ bus_msg->add_cstring_parameter((char *)c1, (char *)c2);
+ bus_msg->add_cstring_parameter(c3, c4);
+ bus_msg->add_double_parameter("double", 1.01);
+ bus_msg->add_float_parameter("float", 1.01);
+ log_debug("params added\n");
+ bus_msg->add_string_parameter(str1, str2);
+ bus_msg->printout();
+ bus_msg->get_long_parameter("long", &err_flg);
+ bus_msg->get_string_parameter(str1, &err_flg);
+ delete(bus_msg);
+ }
+}
+
}
int ClientListenerImpl::event_received(const char *event_param) {
- cout << "event_received(char *" << event_param << ") " << endl;
+ log_debug("event_received(%s)\n", event_param);
return 0;
}
int ClientListenerImpl::event_received(const BusMessage *event_param) {
- cout << "event_received(BusMessage *)" << endl;
+ log_debug("event_received(BusMessage *)\n");
return 0;
}
BusClient *client;
ClientListenerImpl *lstnr;
int err_flg;
- BusMessage *busmsg;
+ BusMessage *bus_msg;
if (argc >= 3) {
client = new BusClient();
lstnr = new ClientListenerImpl();
err_flg = client->init(argv[1]);
if (err_flg == 0) {
- cout << "init success" << endl;
+ log_debug("init success\n");
err_flg = client->add_client_listener(lstnr);
if (err_flg == 0) {
int err_flg;
char *rsp;
- busmsg = new BusMessage(MSG_ID_HELLO);
- busmsg->add_string_parameter("a", "hello");
- busmsg->add_string_parameter("b", "world");
- client->send_message_and_request_response(busmsg);
+ bus_msg = new BusMessage(MSG_ID_HELLO);
+ if (bus_msg != NULL) {
+ bus_msg->add_string_parameter("a", "hello");
+ bus_msg->add_string_parameter("b", "world");
+ client->send_message_and_request_response(bus_msg);
- rsp = NULL;
- err_flg = client->send_message_and_wait_response(argv[2], &rsp);
- cout << "rsp: " << rsp << endl;
- err_flg = client->send_message_and_request_response(argv[2]);
- if (err_flg == 0) {
- cout << "request message send successfully" << endl;
+ rsp = NULL;
+ err_flg = client->send_message_and_wait_response(argv[2], &rsp);
+ log_debug("rsp: %s\n", rsp);
+ err_flg = client->send_message_and_request_response(argv[2]);
+ if (err_flg == 0) {
+ log_debug("request message send successfully\n");
+ }
+ sleep(5);
+ delete(bus_msg);
}
- sleep(10);
}
else {
- cout << "client failed to add response listener" << endl;
+ log_error("client failed to add response listener\n");
}
}
else {
- cout << "client failed to init" << endl;
+ log_error("client failed to init\n");
}
}
else {
- cout << "usage: <server_name> <message text>" << endl;
+ log_info("usage: <server_name> <message text>\n");
}
return 0;
}
public:
inline ServerListenerImpl() {}
virtual ~ServerListenerImpl() {}
- virtual int request_received(const char *msg_req, char **msg_rsp);
- virtual int request_received(const BusMessage *msg_req, BusMessage **msg_rsp);
+ virtual int request_received(const char *msg_req, const char **msg_rsp);
+ virtual int request_received(const BusMessage *msg_req, const BusMessage *msg_rsp);
};
-int ServerListenerImpl::request_received(const char *msg_req_param, char **msg_rsp_param) {
+int ServerListenerImpl::request_received(const char *msg_req_param, const char **msg_rsp_param) {
log_debug("request_received(), request_param: %s\n", msg_req_param);
- *msg_rsp_param = strdup("jee");
+ *msg_rsp_param = "jee";
log_debug("request_received(), response param set: %s\n", *msg_rsp_param);
return 0;
}
-int ServerListenerImpl::request_received(const BusMessage *msg_req_param, BusMessage **msg_rsp_param) {
+int ServerListenerImpl::request_received(const BusMessage *msg_req_param, const BusMessage *msg_rsp_param) {
long type;
int err_flg;
type = ((BusMessage *)msg_req_param)->get_type(&err_flg);
log_debug("request_received, msg type: %ld\n", type);
- *msg_rsp_param = new BusMessage(MSG_ID_HELLO);
+ //*msg_rsp_param = new BusMessage(MSG_ID_HELLO);
double dbl = (double)1.0;
- (*msg_rsp_param)->add_double_parameter("rsp_param_double", dbl);
+ ((BusMessage *)msg_rsp_param)->add_double_parameter("rsp_param_double", dbl);
return 0;
}
if (argc >= 2) {
log_info("starting server\n");
server = new BusServer();
- server->init();
- listener = new ServerListenerImpl();
- server->add_server_listener(listener);
- server->launch(argv[1]);
+ if (server != NULL) {
+ server->init();
+ listener = new ServerListenerImpl();
+ if (listener != NULL) {
+ server->add_server_listener(listener);
+ server->launch(argv[1]);
+ }
+ delete(server);
+ delete(listener);
+ }
}
else {
log_info("usage: give server name as a parameter\n");