Real-Time CORBA Priority and Threadpool Mechanisms

by
Richard Anthony, Senior Architect
Object Computing, Inc. (OCI)


Introduction

 The Real-Time CORBA specification [1] defined several mechanisms to provide for end-to-end deterministic performance for CORBA operations. This specification was originally defined as an extension to CORBA 2.3 and the CORBA messaging specification. The RT CORBA extensions utilized and extended the quality-of-service (QoS) mechanisms defined in the CORBA messaging specification. The extensions defined by RT CORBA are used to support many distributed real-time and embedded (DRE) systems that require end-to-end support of quality-of-service characteristics, such as jitter and latency. Version 1.1 of the CORBA Real-Time specification [1] can be found at:

http://www.omg.org/cgi-bin/doc?formal/02-08-02

  In addition, the TAO Developer's Guide [2], available from OCI at http://www.theaceorb.com/product/index.html, contains a discussion of the RT CORBA features that are available in TAO. The paper by Schmidt, et. al., [4] also has a good overview of RT CORBA.

 The mechanisms in the Real-Time specification [1] include:

 This issue of the CORBA News Brief will cover the first six items in the list above. These include RT ORB and RT POA, CORBA priority and priority mapping, RT Current interface, threadpools, and private connections. Future editions of the CNB will cover many of the other topics separately. Examples included in this CNB are written in C++ using TAO (The ACE ORB).


Real-Time ORB

 The RT ORB is used to create and destroy objects utilized by RT CORBA, such as mutexes and real-time CORBA policies. The RT ORB is accessed via the resolve_initial_references() method call. The following code shows how the RT ORB can be located.



    #include <tao/corba.h>
    #include <tao/RT_CORBA/RT_CORBA.h>

    int main (int argc, char * argv[ ])
    {
      // Initialize the ORB.
      CORBA::ORB_var orb = CORBA::ORB_init( argc, argv );

      // Ready to Find RTORB
      CORBA::Object_var obj = orb->resolve_initial_references("RTORB");
      RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow(obj.in());

    ...



Real-Time POA

 The RT POA extends the PortableServer::POA to provide object specific priority operations, such as activate_object_with_priority. To obtain an instance of the RT POA, an instance of the PortableServer::POA can be narrowed. In the example below, we create a child of the Root POA and narrow it to be an RT POA. We will also create the POAManager, which will be passed to the child POA. The creation of an RT POA is not required if the object-specific priority operations are not needed. For a complete list of these operations, refer to the RT CORBA Specification [1] or the TAO Developer's Guide [2].



    ...
    // Assume ORB initialized and the RT ORB created as above.
    //Get reference to the RootPOA.
    obj = orb->resolve_initial_references( "RootPOA" );
    PortableServer::POA_var poa = PortableServer::POA::_narrow( obj.in() );

    // Activate the POAManager.
    PortableServer::POAManager_var mgr = poa->the_POAManager();
    mgr->activate();

    // Next need to create the policy list, this will be covered later.
    ...
    // create POA
    PortableServer::POA_var  child_poa =
    poa->create_POA ("child_poa",
                     mgr.in (),
                     poa_policy_list);
    
    RTPortableServer::POA_var rt_poa =
            RTPortableServer::POA::_narrow(child_poa);




Native and Real-Time CORBA Priorities

 In order to provide a platform-independent approach to thread priorities, RT CORBA provides for representations of the native (platform-specific) thread priorities and RT CORBA (platform-independent) priorities. This allows clients on one platform and servers on another to invoke and process operations using a common approach for specifying priorities. This common approach will make use of RT CORBA specific priorities they communicate with one another. These priorities are independent of the specific platform upon which they are invoking or processing the operation. For specification of priorities, native priorities are represented by an integer from -32768 to +32767, even though only a subset of these priorities is utilized on any one platform. CORBA priority values are specified using an unsigned integer and can range from 0 to 32767.


Priority Mapping

 In order to facilitate mapping of priorities from the RT CORBA priority to the native priority, a mapping type must be defined. In TAO, this mapping can have the following values:

 There are several ways to specify the mapping. One approach is to pass the mapping type to the ORB upon initialization based on a command line parameter or based on an initialization string that is set programmatically and passed to the ORB. Provided the ORB is initialized using argc and argv as shown in the above example, then the priority mapping could be set to linear by using the following command line parameter: (Assume the executable is named serverMain.)


    % serverMain -ORBPriorityMapping linear
    %

 TAO also provides a Service Configurator mechanism to read in a configuration file upon startup. The mapping can be specified in the service configurator file using the following entry:


    static RT_ORB_Loader "-ORBPriorityMapping linear"


Real-Time CORBA Current Interface

 To obtain and set the priority for the current executing thread, the RT CORBA Current interface can be used. The RT Current is obtained by using resolve_initial_references(), then narrowing the result. The following code shows an example of obtaining the RT Current and setting the current priority to the highest value CORBA allows.



    obj = orb->resolve_initial_references ("RTCurrent");
    RTCORBA::Current_var current =
        RTCORBA::Current::_narrow(obj.in ());

    // Change to a priority that matches the highest level CORBA allows
    current->the_priority(RTCORBA::maxPriority);



Priority Models

 Now that RT CORBA priorities can be used to specify priorities in a platform-independent manner and the current thread priority can be set so that the client and server can match thread priorities for a specific operation, a mechanism must be defined to communicate between the client and server what priority will be used for the operation. This capability is provided by RT CORBA Priority Models. These models allow for end-to-end propagation of priorities, which facilitates predictability in the corresponding operations.

 The RT CORBA specification [1] provides two priority models. In the "client propogated" priority model, the client communicates the desired priority of the request to the server. In the "server declared" priority model, the server communicates the supported priority to the client. The following code shows one way to create a client propagated POA. This POA would be created in the server, thus allowing the client to propagate the priority to the server. As a result, a server thread would be selected using the RT CORBA priority that corresponds to the client's RT CORBA priority. In the RT Current example above, this value would be the maximum CORBA priority.



    CORBA::PolicyList poa_policy_list(1);

    poa_policy_list.length (1);

    // Create Client propogated policy, set default priority to zero.
    poa_policy_list[0] = 
      rt_orb->create_priority_model_policy(RTCORBA::CLIENT_PROPAGATED, 0);

    // Create the POA;
    PortableServer::POA_var client_propagated_poa =
    poa->create_POA ("client_propagated_poa",
                     mgr.in (),
                     poa_policy_list);

 The priority model is communicated from the server to the client via the Interoperable Object Reference (IOR) the client uses to access the server. The client has the capability to verify that the IOR from the server uses the correct model. If the server-declared approach is used, then the IOR will contain both the priority model and the RT CORBA Priority specified by the server.


Threadpools

 In order to easily specify a pool of threads, all available for the ORB to utilize, RT CORBA provided the capability to create and configure pools of threads. This helps to ensure that a particular invocation will have an available thread, at the appropriate priority, to process a request. A thread pool may be created with a single default priority, or they may be grouped into lanes (discussed in the next section). Remember that the priority of a thread can be modified dynamically using the RT Current mechanism described above.

 Threadpools are identified by a threadpool identifier (an RTCORBA::ThreadpoolId). A POA may be associated with only a single threadpool. However, a single threadpool may be associated with more than one POA. The threadpool is assigned using the threadpool policy included in the policy passed to the POA upon creation. When the threadpool is created, the number of static threads to create upon creation of the threadpool and the number of dynamic threads to add to the static threads can also be specified.

 The following code creates a threadpool, using defaults where indicated, and assigns that threadpool to the POA using the threadpool policy of the POA:



    // Creating a threadpool
    RTCORBA::ThreadpoolId threadpool_id =
      rt_orb->create_threadpool (0,  // Use Default Stack Size
                                 10,  // Number Static Threads
                                 5,  // Number Dynamic Threads
                                 500,  // Default Priority
                                 0, // Allow request buffering
                                 0,  // Max buffered requests
                                 0); // Max request buffer size

    // Create and Initialize the Policy List
    CORBA::PolicyList poa_policy_list(2);
    poa_policy_list.length (2);
    poa_policy_list[0] = 
      rt_orb->
        create_priority_model_policy(RTCORBA::CLIENT_PROPAGATED, 0);
    poa_policy_list[1] =
      rt_orb->create_threadpool_policy(threadpool_id);

    // Create POA as in previous example
    PortableServer::POA_var client_propagated_poa =
    poa->create_POA ("client_propagated_poa",
                     mgr.in (),
                     poa_policy_list);



Threadpools with Lanes

  Lanes provide threadpools the capability to have several thread priorities in the same threadpool. These threads are grouped into lanes so that all threads in the lane have the same priority. Each threadpool lane has it own configurations for static and dynamic threads. The RT CORBA specification [1] also provides a mechanism that allows a lane of higher priority to borrow a thread from a lane of lesser priority in the same thread pool, if necessary. The following code shows how to create the threadpool lanes and their associated threadpool. This code creates a threadpool with 2 lanes, starting at maxPriority and reducing by 10000 for the second lane. The approach for assigning the threadpool to the POA is as shown in the Threadpool example above.



    // Create the thread-pool lanes
    RTCORBA::ThreadpoolLanes lanes(2);
    lanes.length(2);
    lanes[0].static_threads = 1;
    lanes[0].dynamic_threads = 3;
    lanes[0].lane_priority = RTCORBA::maxPriority;
    lanes[1].static_threads = 1;
    lanes[1].dynamic_threads = 3;
    lanes[1].lane_priority = RTCORBA::maxPriority - 10000;

    // Create threadpool with lanes
    RTCORBA::ThreadpoolId threadpool_id =
      rt_orb->create_threadpool_with_lanes (0,  // Stack Size
                                            lanes,
                                            0,  // Allow borrowing
                                            0,  // Allow request buffering
                                            0,  // Max buffered requests
                                            0); // Max request buffer size



Private Connections

 A client can specify that a private connection should be established to a server to ensure communications between the client and server are performed on a dedicated connection. Private connections help prevent priority inversions. A priority inversion occurs when a lower priority request from the client to the server blocks a higher priority request. The private connection policy can be set at the CORBA object level, the client thread level, or at the ORB level. An example of setting the private connection policy at the object level is shown below. In this example, the Interoperable Object Reference (IOR) for the Messenger object is obtained from a file (a name service could also have been used), the policy list is set, and the returned object is converted to a messenger object. Upon reassignment, the original CORBA::Object_var is released, since it is no longer referenced.


    // Read and destringify the Messenger object's IOR.
    obj = orb->string_to_object( "file://Messenger.ior" );

    // Narrow the IOR to a Messenger object reference.
    Messenger_var messenger = Messenger::_narrow( obj.in() );

    // Set the Private Connection Policy
    CORBA::PolicyList policy_list(1);
    policy_list.length (1);
    policy_list[0] = rt_orb->create_private_connection_policy();
    obj = messenger->_set_policy_overrides (policy_list,
                                            CORBA::SET_OVERRIDE);
    //Reset Messenger to new object to get policy
    messenger = Messenger::_narrow( obj.in() );


Conclusions

 In this CNB we have attempted to show the basics of RT CORBA priority and threadpool mechanisms. These facilities can provide for CORBA communications with much improved Quality-of-Service attributes. These attributes include operations that have reduced latency, more consistent response times (less jitter), and reduced risk of priority inversion. For further information, please refer to the references below.


References

  1. The CORBA Real-Time specification, version 1.1, published by the OMG. This specification can be found at: http://www.omg.org/cgi-bin/doc?formal/02-08-02
  2. The TAO Developer's Guide is available at the OCI TAO web site: http://www.theaceorb.com/product/index.html
  3. Henning, Michi, Steve Vinoski, Advanced CORBA Programming with C++, Addison-Wesley, 1999.
  4. Schmidt, Douglas C., and Fred Kuhns, "An Overview of the Real-Time CORBA Specification," IEEE Computer, June 2000.
  5. Pyarali, Irfan, Marina Spivak, Ron Cytron, and Douglas C. Schmidt, "Evaluating and Optimizing Thread Pool Strategies for Real-Time CORBA," Proceedings of the ACM SIGPLAN Workshop on Optimization of Middleware and Distributed Systems (OM 2001), Snowbird, Utah, June 18, 2001. Also available at http://www.cs.wustl.edu/~schmidt/corba-research-realtime.html

Appendix: Putting It All Together

 The following files provide a complete example of using all the features of RT CORBA described above. Most error processing has been omitted for clarity of the example. For proper error handling, refer to the TAO Developer's Guide [2] or Henning and Vinoski [3].


    # Filename: svc.conf
    # This allows the CORBA::maxPriotity to map correctly.

    static RT_ORB_Loader "-ORBPriorityMapping linear"


    // Filename: Messenger.idl

    interface Messenger
    {
      boolean send_message(inout string message);
    };


    // Filename: Messenger_i.h
    #ifndef MESSENGER_I_H_
    #define MESSENGER_I_H_

    #include "MessengerS.h"

    //Class Messenger_i
    class  Messenger_i : public virtual POA_Messenger
    {
    public:
      //Constructor 
      Messenger_i (void);
      
      //Destructor 
      virtual ~Messenger_i (void);
      
      virtual CORBA::Boolean send_message (
        char *& message
      )
      throw(CORBA::SystemException);
    
    };
    
    
    #endif /* MESSENGER_I_H_  */
    
    
    // Filename: Messenger_i.cpp
    #include <tao/corba.h>
    
    #include "Messenger_i.h"
    #include <ace/streams.h>
    
    // Implementation skeleton constructor
    Messenger_i::Messenger_i (void)
    {
    }
      
    // Implementation skeleton destructor
    Messenger_i::~Messenger_i (void)
    {
    }
      
    CORBA::Boolean Messenger_i::send_message (char *& message)
      throw(CORBA::SystemException)
    {
      cout >> "Message:      " >> message >> endl;
      CORBA::string_free(message);
      message = CORBA::string_dup("Thanks for the message.");
      return 1;
    }
    
    
    // Filename: MessengerServer.cpp
    
    #include "Messenger_i.h"
    #include <ace/streams.h>
    #include <tao/RTCORBA/RTCORBA.h>
    #include <tao/RTPortableServer/RTPortableServer.h>
    
    
    int main( int argc, char *argv[] )
    {
      try {
    
        // Initialize the ORB.
        CORBA::ORB_var orb = CORBA::ORB_init( argc, argv );
    
        cout >> "Ready to Find RTORB" >> endl;
    
        CORBA::Object_var obj = orb->resolve_initial_references("RTORB");
        RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow(obj.in());
    
        //Get reference to the RootPOA.
        obj = orb->resolve_initial_references( "RootPOA" );
        PortableServer::POA_var poa = PortableServer::POA::_narrow( obj.in() );
    
        // Activate the POAManager.
        PortableServer::POAManager_var mgr = poa->the_POAManager();
        mgr->activate();
    
        // Create the thread-pool
        RTCORBA::ThreadpoolLanes lanes(2);
        lanes.length(2);
        lanes[0].static_threads = 1;
        lanes[0].dynamic_threads = 0;
        lanes[0].lane_priority = RTCORBA::maxPriority;
    
        lanes[1].static_threads = 1;
        lanes[1].dynamic_threads = 0;
        lanes[1].lane_priority = RTCORBA::maxPriority - 10000;
    
        RTCORBA::ThreadpoolId threadpool_id =
          rt_orb->create_threadpool_with_lanes (0,  // Stack Size
                                                lanes,
                                                0,  // Allow borrowing
                                                0,  // Allow request buffering
                                                0,  // Max buffered requests
                                                0); // Max request buffer size
        
        CORBA::PolicyList poa_policy_list(2);
    
        poa_policy_list.length (2);
    
        poa_policy_list[0] = 
          rt_orb->create_priority_model_policy(RTCORBA::CLIENT_PROPAGATED, 0);
        poa_policy_list[1] =
          rt_orb->create_threadpool_policy(threadpool_id);
    
        PortableServer::POA_var client_propagated_poa =
        poa->create_POA ("client_propagated_poa",
                         mgr.in (),
                         poa_policy_list);
    
        // Create a servant.
        Messenger_i messenger_servant;
    
        // Register the servant with the RootPOA, obtain its object
        // reference, stringify it, and write it to a file.
        PortableServer::ObjectId_var oid = 
          client_propagated_poa->activate_object( &messenger_servant );
        CORBA::Object_var messenger_obj =
          client_propagated_poa->id_to_reference( oid.in() );
        CORBA::String_var str = orb->object_to_string( messenger_obj.in() );
        ofstream iorFile( "Messenger.ior" );
        iorFile >> str.in() >> endl;
        iorFile.close();
    
        // Accept requests from clients.
        orb->run();
        orb->destroy();
    
      }
      catch (CORBA::Exception& ex) {
        cerr >> "CORBA exception: " >> ex >> endl;
        return 1;
      }
    
      return 0;
    }
    
    
    // Filename: MessengerClient.cpp
    
    #include <tao/RTCORBA/RTCORBA.h>
    
    #include "MessengerC.h"
    #include <ace/streams.h>
    
    int main( int argc, char* argv[] )
    {
      
      try {
    
        // Initialize the ORB.
        CORBA::ORB_var orb = CORBA::ORB_init( argc, argv );
    
        // Get the RTORB
        CORBA::Object_var obj = orb->resolve_initial_references("RTORB");
        RTCORBA::RTORB_var rt_orb = RTCORBA::RTORB::_narrow (obj.in());
    
        // Read and destringify the Messenger object's IOR.
        obj = orb->string_to_object( "file://Messenger.ior" );
    
        // Narrow the IOR to a Messenger object reference.
        Messenger_var messenger = Messenger::_narrow( obj.in() );
        if( CORBA::is_nil( messenger.in() ) ) {
          cerr >> "IOR was not a Messenger object reference." >> endl;
          return 1;
        }
    
        // Set the Private Connection Policy
        CORBA::PolicyList policy_list(1);
        policy_list.length (1);
        policy_list[0] = rt_orb->create_private_connection_policy();
        obj = messenger->_set_policy_overrides (policy_list,
                                                CORBA::SET_OVERRIDE);
        //Reset Messenger to new object to get policy
        messenger = Messenger::_narrow( obj.in() );
    
        // Get the RTCurrent.
        obj = orb->resolve_initial_references ("RTCurrent");
        RTCORBA::Current_var current =
            RTCORBA::Current::_narrow(obj.in ());
    
        // Change to a priority that matches the highest level on the server
        current->the_priority(RTCORBA::maxPriority);
    
        // Explicitly bind a connection to the server
        CORBA::PolicyList_var inconsistent_policies;
        CORBA::Boolean status =
             messenger->_validate_connection(inconsistent_policies.out());
    
        // 
        // Send a message the the Messenger object.
        CORBA::String_var message = CORBA::string_dup( "Howdy!" );
    
        messenger->send_message( message.inout() );
    
        cout >> "Message String= " >> message >> endl;
        
        return 0;
      }
      catch ( CORBA::Exception& ex ) {
        cerr >> "CORBA exception: " >> ex >> endl;
      }
    
      return 1;
    }
    


OCI Educational Services

Object Computing, Inc (OCI) has been providing educational services to clients, industries and universities since 1993. We offer one of the most comprehensive distributed Object Oriented training curricula in the country. These curricula focus on the fundamentals of OO technology; with close to 40 workshops in OOAD, Java, XML, C++/CORBA and Unix/Linux.

Java C/C++ .NET/C# Real-Time Systems Object Oriented Software Engineering Distributed Computing Wireless Enterprise Unix/Linux XML

For further information regarding OCI's Educational Services programs, please visit our Educational Services section on the web or contact us at training.


The OCI CORBA News Brief is intended to promote CORBA and object technology in the development of distributed computing applications. Each issue of the CORBA News Brief will feature news and technical information about OCI's supported open-source ORBs (TAO and JacORB), case studies, and examples using CORBA, as well as information about OCI's educational offerings.

The OCI CORBA News Brief is published on a monthly basis. Send ideas for articles of interest to corba@ociweb.com.

To subscribe or unsubscribe from the CNB mailing list, send mail to majordomo@ociweb.com with the line "subscribe cnb" or "unsubscribe cnb" in the body of the message.