|
|
|
|
This month's CORBA News Brief from OCI provides a high-level overview of the OMG Notification Service and illustrates using the TAO 1.2a Notification Service interoperating with a JacORB v1.4 consumer.
The Notification Service Specification was originally produced by the Telecommunications Working Group within the Object Management Group (OMG), an international consortium of over 800 companies, and was later adopted as a standard service. The goal was to extend the more basic OMG Event Service specification to support telecommunication applications yet remain backward compatible with the standard Event Service. The Notification Service preserves all the semantics specified for the OMG Event Service, allowing for interoperability between basic Event Service and Notification Service clients.
Both the Notification and Event services enable events (with an optional data payload) to be sent and received between objects in a decoupled fashion. This provides a more flexible mechanism for message transmission than if events were transmitted directly. Analogous to the standard OMG Event Service, the Notification model utilizes an event channel as the substrate for the communication of messages between client applications. Applications that provide messages are termed suppliers while applications that receive (or consume) messages are known as consumers. Suppliers and consumers are completely decoupled from one another (i.e., suppliers have no knowledge of which consumers are listening for their messages and vice versa). While the OMG Event Service supports asynchronous exchange of event messages, it has several serious limitations including no support for event filtering and no ability to be configured for varying levels of qualities of service (QoS). The Notification Service was designed to enhance the OMG Event Service by adding the following features:
Figure 1 shows the general architecture of the Notification Service. Suppliers and consumers of a notification channel are connected via their associated proxies. Administration interfaces on the supplier side and consumer side provide the ability to cluster a set of proxies based on common configurations.

Figure 1: Notification Service Architecture
TAO 1.2a includes an implementation of the OMG Notification Service. Highlights of the 1.2a release include support for the following features:
We illustrate the utility and interoperability of the Notification Service by creating a simple supplier application that sends events through the TAO Notification Channel to a JacORB consumer. The TAO application (written in C++) is a push supplier that connects to the Notification Channel and pushes events. These events are simple text messages that are comprised of a "sender", "subject" and "body". The JacORB (written in Java) consumer is a push consumer that connects to the channel and asynchronously receives event notifications from the channel. Both the supplier and consumer use the structured event type. The example is composed of three applications as illustrated in Figure 2 and further described below:

Figure 2: Overview of TAO/JacORB example.
MessengerClient: a simple C++ application that connects to
the TAO Naming Service, looks up the MessengerServer object reference and
invokes the send_message() method on that object to send two messages, one
with "TAO" as the subject and the other with "JacORB" as
the subject. The MessengerClient application does not directly
interact with the Notification service in any way. We will not describe the
MessengerClient application in any further detail.MessengerServer: a C++ application that plays the role of the
server for the MessengerClient and the supplier for the JacORB consumer.
The Notification channel is created within this application. The
implementation for the Messenger servant contains a send_message() method that
is invoked from the MessengerClient and packages the incoming data (with
the associated "subject", "sender", and
"body") into a structured event and invokes the
push_structured_event() method on the consumer proxy object. The MessengerServer main() method initializes the ORB (TAO), creates the Messenger
servant, and registers the Messenger object with the TAO Naming Service. The
code for the MessengerServer.cpp class is shown below:
#include <orbsvcs/CosNamingC.h>
#include "Messenger_i.h"
#include <ace/streams.h>
int
main(int argc, char * argv[])
{
try
{
// Initialize orb
CORBA::ORB_var orb = CORBA::ORB_init(argc, argv);
// Find the Naming Service.
CORBA::Object_var rootObj =
orb->resolve_initial_references("NameService");
CosNaming::NamingContext_var rootNC =
CosNaming::NamingContext::_narrow(rootObj.in());
// Get the Root POA.
CORBA::Object_var obj = orb->resolve_initial_references("RootPOA");
PortableServer::POA_var poa = PortableServer::POA::_narrow(obj.in());
// Activate POA manager
PortableServer::POAManager_var mgr = poa->the_POAManager();
mgr->activate();
// Create our Messenger servant.
messenger_servant(orb.in());
// Register it with the RootPOA.
PortableServer::ObjectId_var oid =
poa->activate_object( &messenger_servant );
CORBA::Object_var messenger_obj = poa->id_to_reference( oid.in() );
// Bind it in the Naming Service.
CosNaming::Name name;
name.length (1);
name[0].id = CORBA::string_dup("MessengerService");
rootNC->rebind(name, messenger_obj.in());
// Accept requests
orb->run();
orb->destroy();
}
catch (CORBA::Exception& ex) {
cerr << ex << endl;
return 1;
}
return 0;
}
We now implement the structured push supplier that connects to
the Notification Service and sends structured events when its send_message()
method is invoked. The steps involved in connecting the supplier to the
event channel are illustrated in steps 1-6 in Figure 3 and further described
below. The corresponding lines of code are also identified below within the
MessengerServer example.
Figure 3: TAO C++ Messenger Server Supplier
Obtain an object reference to the event channel factory.
Obtain an event channel.
Obtain the SupplierAdmin object reference.
Obtain a structured push consumer proxy object.
Receive an incoming message from MessengerClient application.
Connect to the proxy to push the structured event.
The Messenger_i class implements the Messenger interface (containing the
send_message() method) and also acts as
an event supplier to the TAO Notification Channel. The constructor for the
Messenger servant initializes the ORB and uses the TAO Naming Service
("NameService") to look up the TAO Notification Service channel
factory. The Notification Channel is then bound to the root naming context of
the TAO Naming Service under the name "MyEventChannel."
Messenger_i::Messenger_i (CORBA::ORB_ptr orb)
: orb_ (CORBA::ORB::_duplicate(orb))
{
try
{
CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
PortableServer::POA_var poa = PortableServer::POA::_narrow(poa_obj.in());
CORBA::Object_var naming_obj =
orb_->resolve_initial_references ("NameService");
if (CORBA::is_nil(naming_obj.in())) {
cerr << "Unable to find naming service" << endl;
}
CosNaming::NamingContext_var naming_context =
CosNaming::NamingContext::_narrow(naming_obj.in());
// Create an instance of TAO's notification event channel
CosNotifyChannelAdmin::EventChannelFactory_var notify_factory =
TAO_Notify_EventChannelFactory_i::create(poa.in());
CosNotifyChannelAdmin::ChannelID id;
CosNotification::QoSProperties initial_qos;
CosNotification::AdminProperties initial_admin;
CosNotifyChannelAdmin::EventChannel_var ec =
notify_factory->create_channel (initial_qos,
initial_admin,
id);
if (CORBA::is_nil (ec.in())) {
cerr << "Unable to crete event channel" << endl;
}
CosNaming::Name name(1);
name.length(1);
name[0].id = CORBA::string_dup("MyEventChannel");
naming_context->rebind(name, ec.in());
CosNotifyChannelAdmin::AdminID adminid;
CosNotifyChannelAdmin::InterFilterGroupOperator ifgop =
CosNotifyChannelAdmin::AND_OP;
CosNotifyChannelAdmin::SupplierAdmin_var supplier_admin =
ec->new_for_suppliers (ifgop, adminid);
if (CORBA::is_nil (supplier_admin.in())) {
cerr << "Unable to find supplier admin" << endl;
}
CosNotifyChannelAdmin::ProxyID supplieradmin_proxy_id;
CosNotifyChannelAdmin::ProxyConsumer_var proxy_consumer =
supplier_admin->obtain_notification_push_consumer(
CosNotifyChannelAdmin::STRUCTURED_EVENT,
supplieradmin_proxy_id);
StructuredEventSupplier_i *servant =
new StructuredEventSupplier_i(orb_.in());
PortableServer::ObjectId_var oid = poa->activate_object(servant);
CORBA::Object_var supplier_obj = poa->id_to_reference(oid.in());
CosNotifyComm::StructuredPushSupplier_var supplier =
CosNotifyComm::StructuredPushSupplier::_narrow(supplier_obj.in());
consumer_proxy_ =
CosNotifyChannelAdmin::StructuredProxyPushConsumer::
_narrow(proxy_consumer.in());
if (CORBA::is_nil (consumer_proxy_.in())) {
cerr << "Unable to find structured proxy push consumer" << endl;
}
consumer_proxy_->connect_structured_push_supplier(supplier.in());
}
catch (CORBA::Exception &ex) {
cerr << ex << endl;
}
}
The send_message() method of the Messenger Servant formats the message
and then creates a new structured event and populates it with the contents
of the message. The
push_structured_event() operation of the structured push consumer proxy
object is used to push the event to the notification channel.
CORBA::Boolean Messenger_i::send_message (const char * user_name,
const char * subject,
char *& message)
throw (CORBA::SystemException)
{
cout << "Message from: " << user_name << endl;
cout << "Subject: " << subject << endl;
cout << "Message: " << message << endl;
try
{
// Event Definition
CosNotification::StructuredEvent event;
event.header.fixed_header.event_type.domain_name =
CORBA::string_dup("OCI");
// string
event.header.fixed_header.event_type.type_name =
CORBA::string_dup("examples");
// string
event.header.fixed_header.event_name =
CORBA::string_dup("myevent");
// sequence<Property>: string name, any value
event.filterable_data.length (1);
event.filterable_data[0].name = CORBA::string_dup("From");
event.filterable_data[0].value <<= (const char *)user_name;
event.filterable_data.length (2);
event.filterable_data[1].name = CORBA::string_dup("Subject");
event.filterable_data[1].value <<= (const char *)subject;
event.filterable_data.length (3);
event.filterable_data[2].name = CORBA::string_dup("Message");
event.filterable_data[2].value <<= (const char *)message;
consumer_proxy_->push_structured_event(event);
}
catch (CosNotifyComm::InvalidEventType &) {
cerr << "Invalid Event Type Exception " << endl;
return 1;
}
catch (CORBA::Exception &ex) {
cerr << ex << endl;
return 1;
}
return 0;
}
Lastly, the supplier class StructuredEventSupplier_i, which implements the
CosNotifyComm:StructuredPushSupplier IDL interface, must be implemented. It
contains two
operations:
disconnect_structured_push_supplier(): supplier invokes this method
when it is finished providing event data; andsubscription_change(): method invoked in the event that the consumers
change their subscription model.The constructor for this class simply duplicates the ORB reference.
StructuredEventSupplier_i::StructuredEventSupplier_i(CORBA::ORB_ptr orb)
: orb_(CORBA::ORB::_duplicate(orb))
{
}
The implementation for the subscription_change() method does nothing for the
current implementation. The implementation for the
disconnect_structured_push_supplier() deactivates the supplier object.
void StructuredEventSupplier_i::disconnect_structured_push_supplier ()
throw (CORBA::SystemException)
{
CORBA::Object_var obj = orb_->resolve_initial_references ("POACurrent");
PortableServer::Current_var current =
PortableServer::Current::_narrow (obj.in());
PortableServer::POA_var poa = current->get_POA ();
PortableServer::ObjectId_var objectId = current->get_object_id ();
poa->deactivate_object (objectId.in());
}
We now implement the JacORB consumer which implements a structured push consumer that connects to the TAO Notification Channel and prints all the structured events it receives. The consumer uses the filtering functionality of the Notification Service to set up a constraint to allow only messages whose subjects contain "JacORB" to be received.

Figure 4: JacORB Consumer
Obtain
an object reference to the event channel.
Obtain
the ConsumerAdmin object reference.
Use the
default filter factory to create a new filter; add the filter to the
ConsumerAdmin.
Obtain a
structured push supplier proxy object.
Connect
to the proxy.
The JacORB consumer is implemented in the Java class
JacORBConsumer shown below:
package JacORBConsumer;
import org.omg.CosNotification.*;
import org.omg.CosNotifyComm.*;
import org.omg.CosNotifyChannelAdmin.*;
import org.omg.CosNotifyFilter.*;
import org.omg.CosNaming.*;
import org.omg.CosNaming.NamingContextPackage.*;
import org.omg.PortableServer.*;
import java.io.*;
import java.util.*;
import java.net.*;
public class Consumer extends StructuredPushConsumerPOA
{
public static void main( String[] args )
{
try {
System.out.println ("Initialize orb\n");
org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
// get naming service reference and context
BufferedReader reader = new BufferedReader(new FileReader("ns.ior"));
String ns = reader.readLine();
org.omg.CORBA.Object obj = orb.string_to_object(ns);
NamingContext rootContext = NamingContextHelper.narrow( obj );
System.out.println("Got Root context: "+ obj);
// find the event channel reference
NameComponent[] name = { new NameComponent( "MyEventChannel", "" )};
obj = rootContext.resolve(name);
EventChannel channel = EventChannelHelper.narrow(obj);
System.out.println ("**************MyChannel is " + channel.toString ());
// get the admin interface and the supplier proxy
InterFilterGroupOperator ifgop = InterFilterGroupOperator.AND_OP;
org.omg.CORBA.IntHolder adminId = new org.omg.CORBA.IntHolder(0);
ConsumerAdmin consumerAdmin = channel.new_for_consumers(ifgop, adminId);
// get the default filter factory
FilterFactory filterFactory = channel.default_filter_factory();
Filter filter = null;
if ( filterFactory == null ){
System.err.println ("No default filter factory found!");
}
try {
filter = filterFactory.create_filter("TCL");
}
catch (Exception e){
System.err.println( "ERROR: " + e );
e.printStackTrace( System.err );
}
String expr = "Subject == 'JacORB'";
ConstraintExp exp[] = new ConstraintExp[1];
EventType eventType[] = new EventType [0];
exp[0] = new ConstraintExp (eventType, expr);
try {
ConstraintInfo info[] = filter.add_constraints (exp);
consumerAdmin.add_filter(filter);
}
catch (InvalidConstraint ex) {
System.err.println( "ERROR: " + e );
e.printStackTrace( System.err );
}
EventType added[] = new EventType[1];
EventType removed[] = new EventType [0];
added[0] = new EventType ("*", "*");
try{
consumerAdmin.subscription_change(added, removed);
}
catch (Exception e){
System.err.println( "ERROR: " + e );
e.printStackTrace( System.err );
}
POA poa = org.omg.PortableServer.POAHelper.narrow
(orb.resolve_initial_references("RootPOA"));
// create and implicitly activate the client
StructuredPushConsumer structuredPushConsumer =
(StructuredPushConsumer)new Consumer()._this(orb);
// get the stuctured proxy push supplier
ClientType clientType = ClientType.STRUCTURED_EVENT;
org.omg.CORBA.IntHolder proxyId = new org.omg.CORBA.IntHolder (0);
ProxySupplier proxySupplier = null;
try{
proxySupplier = consumerAdmin.obtain_notification_push_supplier (clientType, proxyId);
}
catch( AdminLimitExceeded e ){
System.err.println ( "ERROR: " + e);
e.printStackTrace( System.err );
}
StructuredProxyPushSupplier proxyPushSupplier =
StructuredProxyPushSupplierHelper.narrow(proxySupplier);
// connect ourselves to the event channel
proxyPushSupplier.connect_structured_push_consumer(structuredPushConsumer);
poa.the_POAManager().activate();
System.out.println ("run the orb");
orb.run();
}
// Catch exceptions
catch ( Exception e ) {
System.err.println( "ERROR: " + e );
e.printStackTrace( System.err );
}
System.out.println("Normal Termination...");
}
public void disconnect_structured_push_consumer (){
System.out.println ("disconnect_structured_push_consumer invoked");
}
public void offer_change (EventType added[], EventType removed[]){
System.out.println ("offer_change invoked");
}
public void push_structured_event (StructuredEvent event){
try {
System.out.println ("\nevent name is: " + event.header.fixed_header.event_name);
System.out.println ("domain name is: " + event.header.fixed_header.event_type.domain_name);
System.out.println ("type name is: " + event.header.fixed_header.event_type.type_name);
for (int i = 0; i < event.filterable_data.length; i++){
System.out.println (event.filterable_data[i].name + ":\t" + event.filterable_data[i].value);
}
}
catch ( Exception e ) {
System.err.println( "ERROR: " + e );
e.printStackTrace( System.err );
}
}
}
The consumer locates the TAO Notification service and invokes the
new_for_consumers() method on the channel interface to get the ConsumerAdmin
object reference. The consumer uses the default_filter_factory() to create a
filter that utilizes the TCL constraint language. A constraint is created
("Subject == 'JacORB'") and added to the filter using the add_constraints()
method on the filter. The filter is then added to the ConsumerAdmin object using
the add_filter() method. Invoking the
obtain_notification_push_supplier() operation on the ConsumerAdmin object
obtains the supplier push proxy. Finally, the proxy is connected and the
consumer waits for events to be pushed. Events are printed in the
push_structured_event() method. Invoking the
disconnect_structured_push_consumer() operation, by the channel, will disconnect
the consumer at any point in time (care should be taken in the disconnection of
suppliers and consumers). The offered_change() operation does nothing in
this example, but an intelligent consumer may, for example, disconnect when the
channel no longer offers the types of events that are of interest.
This article focused on illustrating how to use the TAO Notification Service in conjunction with a JacORB consumer. The full source code of the example illustrated in this article can be found here. There are many resources available that provide greater detail into the OMG Notification Service, TAO, and JacORB. These include the OMG Notification Service specification and Java Programming with CORBA by Brose, Vogel, and Duddy. In depth information on CORBA programming with TAO can be found from the the recently released TAO 1.2a Developer's Guide. Other related resources can be found by visiting the links listed in the References section.
Object Computing, Inc (OCI) has been providing educational services to clients, industries and universities since 1993. We offer one of the most comprehensive distributed Object Oriented training curricula in the country. These curricula focus on the fundamentals of OO technology; with close to 40 workshops in OOAD, Java, XML, C++/CORBA and Unix/Linux.
For further information regarding OCI's Educational Services programs, please visit our Educational Services section on the web or contact us at training.
The OCI CORBA News Brief is intended to promote CORBA and object technology in the development of distributed computing applications. Each issue of the CORBA News Brief will feature news and technical information about OCI's supported open-source ORBs (TAO and JacORB), case studies, and examples using CORBA, as well as information about OCI's educational offerings.
The OCI CORBA News Brief is published on a monthly basis. Send ideas for articles of interest to corba.
To subscribe or unsubscribe from the CNB mailing list, send mail to majordomo with the line "subscribe cnb" or "unsubscribe cnb" in the body of the message.
|
![]() |
|