Novell is now a part of Micro Focus

An Asynchronous Transaction-Enabled J2EE Application: A Message Driven Bean Example

Articles and Tips: article

Vijay Rajan
Senior Software Engineer
Novell, Inc.
vrajan@novell.com

01 Feb 2003


This AppNote describes a sample application that utilizes many of the technologies of a J2EE 1.3 compatible application server. It demonstrates the use of a stateless server-side application that is managed by a transaction-aware container. The example uses a JMS Topic Publisher as a client to send asynchronous messages representing stock price updates.


Topics

J2EE, application development, asynchronous transactions

Products

Novell exteNd

Audience

developers

Level

intermediate

Prerequisite Skills

familiarity with J2EE programming

Operating System

n/a

Tools

none

Sample Code

yes

Introduction

This AppNote describes a sample application that utilizes many of the technologies of a J2EE 1.3-compatible application server. It demonstrates the use of a stateless server side application that is managed by a transaction aware container. The example uses a JMS Topic Publisher as a client to send asynchronous messages representing stock price updates. The JMS server saves each incoming message in a database and notifies the EJB container. The container begins a transaction and activates the message driven bean residing on the server. The message driven bean processes the message and sends a response message to a JMS Queue. The messages in the JMS Queue are delivered back to the client. A means of monitoring key subsystems participating in this process is also demonstrated.

The example utilizes the Novell exteNd Application Server and the Novell jBroker middleware products. Only the monitoring aspect of this example is specific to the jBroker product line. The application itself is based on J2EE standards and will work on any J2EE 1.3-compatible product. This AppNote provides you with an overview of server side J2EE technologies and hands-on experience with deploying and monitoring such an application.

Technologies

The example described in this AppNote utilizes many components of a J2EE 1.3- compatible application server. Following is a list of specific technologies along with a brief description of each.

Enterprise JavaBeans (EJB)

Enterprise JavaBeans (EJB) provides a component model that simplifies the development of server side applications by providing automatic support for services such as transactions, security, and database connectivity. Such services are provided by the EJB container which is part of the Application Server. The container manages the complete life cycle of the Enterprise JavaBean. In addition applications developed using EJBs are scalable and portable. EJB 2.0 specifies three types of beans: session, entity, and message driven beans.

In this example we use the message driven bean. A message driven bean is an asynchronous message consumer and is invoked by the EJB container on the arrival of a JMS message. A client cannot access the bean directly, but does so by sending messages to the JMS Destination (Queue or Topic) for which the message driven bean class is the MessageListener. Message driven beans have no conversational state and are anonymous. Novell exteNd Application Server 4.0 provides a complete foundation for building and deploying cross-platform, high-performance, J2EE 1.3 and Web Service-based applications. Novell exteNd Application Server was one of the first to be J2EE 1.3-certified and provides complete support for EJB 2.0.

Java Message Service (JMS)

The Java Message Service (JMS) brings enterprise messaging to the Java platform. It provides a standard API for creating portable message based applications in the Java programming language. It also enables the message driven bean (MDB), an enterprise bean, for the asynchronous consumption of messages. Novell exteNd Application Server includes jBroker TM, a 100% pure Java implementation of the JMS API. jBroker MQ is based on the high performance and scalable enterprise class IIOP support in jBroker ORB, and is fully compliant with JMS specification v1.0.2.

JDBC

Java Database Connectivity (JDBC) is an API that enables access to virtually any tabular data source from the Java programming language. It provides connectivity to a wide range of SQL database, spreadsheets, or flat files. Novell exteNd Application Server 4.0 provides a complete support for JDBC 2.0 + ext. It also provides floating JDBC connection pools with dynamic reconnect to significantly improve performance.

Java Transaction Service (JTS)

The Java Transaction Service (JTS), based on CORBA Object Transaction Service (OTS), specifies the implementation of a Transaction Manager that provides transaction service in a distributed transaction system. It supports the Java Transaction API (JTA), which specifies the interaction between the Transaction Manager and the parties involved in a distributed transaction. Novell exteNd Application Server includes jBroker MQ, a 100% pure Java implementation of the Java Transaction Service based on the Java Transaction API. jBroker TM is based on JTA v1.01 and OMG OTS v1.2 and supports the distributed transaction management of CORBA and JDBC 2.0 XA aware resources.

Common Object Request Broker Architecture (CORBA)

CORBA is the open standard for distributed computing. It provides a language and platform neutral distributed objects framework that is the foundation for J2EE technologies such as Enterprise JavaBeans, RMI over IIOP, Java IDL, and Java Transaction Service. Novell exteNd Application Server includes jBroker ORB, a fully CORBA 2.4-compliant enterprise class ORB with industry leading performance and scalability.

Server Monitoring

The monitoring features shown in this article are based on the jBroker Console product and features of the jBroker ORB. It is not based on any J2EE technology and hence is the only part of this example that is not portable. jBroker Console enables the visual monitoring of various properties of jBroker ORB, jBroker MQ, and jBroker TM. In addition custom applications can also expose properties that can be monitored using jBroker Console. The monitoring of custom applications will be demonstrated in this AppNote.

Sample Application

The application consists of a client that sends stock price updates asynchronously to a JMS topic. The messages are processed by a message driven bean that is tied to the JMS topic. The bean extracts the stock ticker symbols and prices from each message, updates a database table with the information, and sends a response message to a JMS queue. The client reads the response messages from the JMS queue (see Figure 1).

Figure 1: Diagram of the sample application.

The Message Driven Bean

A message driven bean is an asynchronous message consumer that is invoked by the EJB container on the arrival of a JMS message. The MDB is an instance of both a MessageDrivenBean and a MessageListener. We begin with a skeletal implementation of a message driven bean. A full listing of the MDB is available for download from the AppNotes Web site at http://www.novell.com/appnotes.

package stock;

import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;

import javax.jms.Message;
import javax.jms.MessageListener;

public class MDB implements MessageDrivenBean, MessageListener
{
   private MessageDrivenContext   _context = null;

   public void ejbCreate()
   {
   }

   public void setMessageDrivenContext(MessageDrivenContext context)
   {
   |   _context = context;
   }

   public void onMessage(Message msg)
   {
   }

   public void ejbRemove()
   {
   }
}

The ejbCreate, setMessageDrivenContent, and ejbRemove methods are used by the container to manage the lifecycle of the bean. They are used during the creation of the bean and to free resources before the bean is removed. The container calls the bean's onMessage method for each message that arrives at the destination tied to the bean. Associating the message driven bean with a JMS destination (Queue or Topic) is done when the bean is deployed in the container. Deployment information is contained in the deployment descriptor (ejb-jar.xml) of the bean and in the vendor specific deployment plan. Both the deployment descriptor and deployment plan for this example are available for download from the AppNotes Web site. Note that the deployment plan is specific to Novell exteNd Application Server 4.0 and may vary for application servers from other vendors.

Implementing the ejbCreate Method

When the container creates an instance of a message driven bean it first calls the bean class' newInstance method. Once a bean instance has been created the container then calls the instance's setMessageDrivenContext method to provide the instance with a MessageDrivenContext that gives it access to the context maintained by the container. Finally the container call the instance's ejbCreate method. Here the bean can do initialization or any tasks that need to done when an instance is created. In this example we will use the ejbCreate method to get access to the resources specified in the deployment. In addition we also create an instance of a jBroker Manageable class which will be used to expose the price of the "novl" ticker symbol to the jBroker Console. This is specific to the Novell jBroker line of products and will not work with products from other vendors.

public void ejbCreate()
{
|   try {
|   |   // get the initial context
|   |   InitialContext ctx = new InitialContext();
|   |                                                                  
|   |   // get queue connection factory

|   |   _factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/XAQCF");
|   |                                                                  
|   |   // get data source
|   |   _source = (DataSource) ctx.lookup("java:comp/env/DB");
|   |                                                                  
|   |   // create a Manageable using the Monitor helper class
|   |   // and register it with the ORB so that it can be used by
|   |   // jBroker Console to monitor the exposed properties
|   |   // NOTE: This is Novell jBroker ORB specific
|   |   // comment or delete the following section if using with different vendor
|   |   com.sssw.jbroker.ORB orb = (com.sssw.jbroker.ORB) ctx.lookup("java:comp/ORB");
|   |   ManageableCollection mcoll = orb.getJBrokerManageable();
|   |   _mgble = mcoll.getManageable("MDBAdmin");
|   |   if (_mgble == null) {
|   |   |   try {
|   |   |   |   _mgble = new Monitor(orb, "MDBAdmin");
|   |   |   } catch (Error e) {
|   |   |   |   throw new EJBException("MDB:ejbCreate -- Could not create a Manageable");
|   |   |   }
|   |   }
|   } catch (Exception ex) {
|   |   ex.printStackTrace();
|   |   throw new EJBException("MDB:ejbCreate failed", ex);
|   }
}

Implementing the onMessage Method

The container calls the onMessage method of the message driven bean for each message that is received at the destination associated with the bean. If the deployment has specified a container managed transaction (as is the case in this example) then the container has already initialized a new transaction before calling the bean. The bean can access the getRollbackOnly and setRollbackOnly methods of the MessageDrivenContext to test if the transaction has been marked for rollback or to mark the current transaction for rollback. In the case of a bean managed transaction the bean can use the getUserTransaction method of the MessageDrivenContext to set or get transaction status.

In this example a MapMessage with ticker symbols and prices is delivered to the bean. The bean extracts this information from the MapMessage and performs the following three actions:

  • It updates the stock price to a table in the database. It is assumed that the table already exists in the database. If there is no record in the table for the given ticker symbol, it creates a new record.

  • If the ticker symbol is "novl", it updates the price with the Manageable object. As exaplained earlier, the Manageable class used here is specific to jBroker ORB, and allows this value to be monitored by jBroker Console.

  • It sends a response to the destination specified as the replyTo of the incoming message. In this case the reply destination is a queue and a text message of the form "updated N symbols" (where N is the number of ticker symbols in the incoming MapMessage) is sent to the queue.

public void onMessage(Message msg)
{
|   try {
|   |   // count of the number of ticker symbols
|   |   int count = 0;
|   |                                                                  
|   |   // get a database connection
|   |   Connection conn = _source.getConnection();
|   |                                                                  
|   |   // prepare a statement
|   |   PreparedStatement stmt = conn.prepareStatement(_sql);
|   |                                                                  
|   |   // get an enumeration over symbols in map message
|   |   MapMessage map = (MapMessage) msg;
|   |   Enumeration e = map.getMapNames();
|   |   while (e.hasMoreElements()) {
|   |   |   // get symbol name and new price
|   |   |   String symbol = (String) e.nextElement();
|   |   |   double price  = map.getDouble(symbol);
|   |   |                                                              
|   |   |   // update the price with the manageable so that it can be monitored
|   |   |   // comment out if not using jBroker ORB
|   |   |   if (symbol.equals("novl")) {
|   |   |   |   _mgble.setProperty("Novl", new Double(price));
|   |   |   }
|   |   |   count++;
|   |   |                                                              
|   |   |   // set values in prepared statement
|   |   |   stmt.setDouble(1, price);
|   |   |   stmt.setString(2, symbol);
|   |   |                                                              
|   |   |   // update database
|   |   |   int rows = stmt.executeUpdate();
|   |   |                                                              
|   |   |   // if record does not exist
|   |   |   if (rows == 0) {
|   |   |   |   // insert a new record
|   |   |   |   Statement ins_stmt = conn.createStatement();
|   |   |   |   String sql = "insert into quotes (symbol, price) values ('";
|   |   |   |   sql +=  symbol + "', " + price + ")";
|   |   |   |   ins_stmt.executeUpdate(sql);    
|   |   |   |   ins_stmt.close();
|   |   |   }
|   |   }
|   |                                                                  
|   |   // close statement
|   |   stmt.close();
|   |                                                                  
|   |   // close connection
|   |   conn.close();
|   |                                                                  
|   |   // respond to reply queue (if any)
|   |   Topic replyTo = (Topic) msg.getJMSReplyTo();
|   |                                                                  
|   |   if (replyTo != null) {
|   |   |   // create a queue connection to reply queue
|   |   |   QueueConnection qconn = _factory.createQueueConnection();
|   |   |                                                              
|   |   |   // create a session
|   |   |   QueueSession session = qconn.createQueueSession(false,
|   |   |       Session.AUTO_ACKNOWLEDGE);
|   |   |                                                              
|   |   |   // create a queue sender
|   |   |   Queue queue = JMQQueue.getQueue(replyTo.getTopicName());
|   |   |   QueueSender sender = session.createSender(queue);
|   |   |                                                              
|   |   |   // create and send a message
|   |   |   TextMessage text = session.createTextMessage();
|   |   |   text.setText("updated " + count + " symbols");
|   |   |   sender.send(text);
|   |   |                                                              
|   |   |   // close connection (will close session and sender)
|   |   |   qconn.close();
|   |   }
|   } catch (Throwable ex) {
|   |   ex.printStackTrace();
|   }
}

The Topic Publisher

The client program produces JMS messages that contain pairs of stock ticker symbols and prices. These messages are sent to a JMS topic to be consumed by the message driven bean. The client also listens to a JMS queue to receive responses from the message driven bean, printing the response on the screen. We begin with writing the code to listen to messages on a queue followed by the creation of a topic publisher. A full listing of the Client can be downloaded from the AppNotes Web site.

Listening to a JMS Queue

To listen to a Queue, we first need to get the QueueConnectionFactory and Queue objects. This is done using JNDI to lookup the specific resource names. When that is done we create a QueueConnection, a QueueSession, and finally a QueueReceiver for the specific Queue. In order to receive JMS messages the Client must implement javax.jms.MessageListener and have an onMessage method that accepts a javax.jms.Message as its only parameter. Once the Client class is registered as the message listener with the QueueReceiver object, the onMessage method of the Client will be called for each message that arrives at the specified Queue. Note that the resource names iiop://balrog:3506/queue/queue0 and iiop://balrog:3506/queue/connectionFactory are specific to jBroker MQ, since it uses IIOP as the transport. The hostname "balrog" and port "3506" will also need to be changed accordingly to the host and port being used by the JMS server.

package stock;

import javax.jms.*;
import javax.naming.*;

public class Client implements MessageListener
{
public static void main(String[] args) throws Exception
{
|   // get the initial context
|   InitialContext ctx = new InitialContext();
|                                                                      
|   // lookup the replyTo queue object
|   Queue queue = (Queue) ctx.lookup("iiop://balrog:3506/queue/queue0");
|                                                                      
|   // lookup the queue connection factory
|   QueueConnectionFactory qfac = (QueueConnectionFactory) ctx.
|           lookup("iiop://balrog:3506/queue/connectionFactory");
|                                                                      
|   // create a queue connection
|   QueueConnection queueConn = qfac.createQueueConnection();
|                                                                      
|   // create a queue session
|   QueueSession queueSession = queueConn.createQueueSession(false, 
|           Session.AUTO_ACKNOWLEDGE);
|                                                                      
|   // create an asynchronous queue receiver
|   QueueReceiver queueReceiver = queueSession.createReceiver(queue);
|   queueReceiver.setMessageListener(new Client());
|                                                                      
|   // start the connection
|   queueConn.start();
|                                                                      
|   // do the work of the Client
|   while (true) {
|   |   // do something
|   |   .....
|   }
}

public void onMessage(Message m)
{
|   try {
|   |   TextMessage text = (TextMessage) m;
|   |   System.out.println("reply: " + text.getText());
|   } catch (Exception ex) {
|   |   ex.printStackTrace();
|   }
}
}

Creating a JMS Topic Publisher

Many of the steps to create a Topic Publisher are similar to what we just did to create a Queue listener. We use JNDI to lookup the Topic and TopicConnectionFactory. After that we create a TopicConnection, a TopicSession, and finally a TopicPublisher for the specific Topic. The TopicPublisher object can then be used to send JMS messages to the Topic. In this example we publish a MapMessage that contains pairs of stock ticker symbols and prices. The example program is in an infinite loop where it sends a update every second until it is stopped. Note, as before that the resource names iiop://balrog:3506/topic/topic0 and iiop://balrog:3506/topic/connectionFactory are specific to jBroker MQ, since it uses IIOP as the transport. The hostname "balrog" and port "3506" will also need to be changed accordingly to the host and port being used by the JMS server.

package stock;

import java.util.HashMap;

import javax.jms.*;
import javax.naming.*;

public class Client
{
private static HashMap _prices = new HashMap();
private static final String[] _symbols = {
|   "novl", "ibm", "sunw", "msft", "beas", "orcl"
};

public static void main(String[] args) throws Exception
{
|   // get the initial context
|   InitialContext ctx = new InitialContext();
|                                                                      
|   // lookup the topic object
|   Topic topic = (Topic) ctx.lookup("iiop://balrog:3506/topic/topic0");
|                                                                      
|   // lookup the topic connection factory
|   TopicConnectionFactory tfac = (TopicConnectionFactory) ctx.
|           lookup("iiop://balrog:3506/topic/connectionFactory");
|                                                                      
|   // create a topic connection
|   TopicConnection topicConn = tfac.createTopicConnection();
|                                                                      
|   // create a topic session
|   TopicSession topicSession = topicConn.createTopicSession(false, 
|           Session.AUTO_ACKNOWLEDGE);
|                                                                      
|   // create a topic publisher
|   TopicPublisher topicPublisher = topicSession.createPublisher(topic);
|                                                                      
|   while (true) {
|   |   // calculate new price for all stocks
|   |   MapMessage message = topicSession.createMapMessage();
|   |   for (int i = 0; i < _symbols.length; i ++) {
|   |   |   Double price = (Double) _prices.get(_symbols[i]);
|   |   |   if (price == null) price = new Double(10);
|   |   |   double delta = (Math.random() - 0.5) * 5;
|   |   |   double next = price.doubleValue() + delta;
|   |   |   if (next < 0) next = -next;
|   |   |   price = new Double(next);
|   |   |   _prices.put(_symbols[i], price);
|   |   |   message.setDouble(_symbols[i], next);
|   |   }
|   |                                                                  
|   |   // set the replyTo field
|   |   message.setJMSReplyTo(queue);
|   |                                                                  
|   |   // publish the messages
|   |   topicPublisher.publish(message);
|   |                                                                  
|   |   // print what we did
|   |   System.out.println("sent stock quote updates..."); 
|   |                                                                  
|   |   // wait a second
|   |   Thread.sleep(1000);
|   }
}
}

Monitoring the Server

As mentioned earlier, in addition to monitoring properties of jBroker ORB, MQ, and TM, jBroker Console can also be used to monitor properties of custom applications. To enable this, the application needs to implement the com.sssw.jbroker.api.admin.Manageable interface and expose the necessary properties. This can be simplified by making use of the utility class com.sssw.jbroker.admin.ManageableBase. We create a new class that extends the utility class and follows the JavaBeans design pattern. Reflection is then used to determine the list of properties, their types, and whether the property is writeable.

In this example we would like to expose the price associated with the "novl" symbol so that it can be monitored by jBroker Console. So we create the following helper class that works with the message driven bean. An instance of this class is created in the ejbCreate method of the bean, and the price associated with the "novl" ticker symbol is set from the bean's onMessage method. jBroker Console can now fetch and display graphically this value. To expose the other ticker symbols we would have to create a set and get method corresponding to each symbol.

package stock;

import java.util.Hashtable;

public class Monitor extends com.sssw.jbroker.admin.ManageableBase
{
private final Hashtable _table = new Hashtable();

public Monitor(com.sssw.jbroker.ORB orb, String name) 
{
|   super(orb, name);
}

public double getNovl()
{
|   Double value = (Double) _table.get("novl");
|   return (value == null) ? 0.0 : value.doubleValue();
}

public void setNovl(double price)
{
|   _table.put("novl", new Double(price));
}
}

Deployment Considerations

Compile the source files MDB.java, Monitor.java, and Client.java. You will have to include jbroker-rt.jar and jbroker-mq.jar in your CLASSPATH in addition to the J2EE api jars. You should find these 2 jars in %EXTEND_HOME%\jre\ lib\ext\jbroker-rt.jar and %EXTEND_HOME%\jbrokerMQ\lib\jbroker-mq.jar, where %EXTEND_HOME% is the location where you installed the Novell exteNd Application Server.

Once you have successfully compiled the source files, create a jar file named MDB.jar that can be deployed to the Server. The contents of the jar file should be as follows:

stock

|

+ --- MDB.class

|

+ --- Monitor.class

META-INF

|

+ --- ejb-jar.xml

|

+ --- MANIFEST.MF

The contents of the Deployment Descriptor (ejb-jar.xml) and the Manifest (MANIFEST.MF) are available for download from the AppNotes Web site.

Create the Deployment Plan file named MDBDeplPlan.xml. The contents of the Deployment Plan is also available for download from the AppNotes Web site.

Deploy the EJB jar to the Novell exteNd Application Server using the following command:

SilverCmd DeployEJB balrog DBNAME MDB.jar -f MDBDeplPlan.xml-o -v 3

Note that the words balrog and DBNAME in the above line represent the server name and database name respectively. Replace it with the actual name of your server and database.

Configuring the Server and Database

The message driven bean in the example writes information to a database table. It gets access to the database by looking up the resource name DB, of type javax.sql.DataSource, specified in the deployment descriptor. This resource name is mapped to the actual resource /JDBC/DBPool in the deployment plan. A JDBC Connection Pool named DBPool should be created on the server in order for this example to be able to write to the database. Please refer to the exteNd application server documentation or use the Server Management Console to create the Connection Pool. If you already have an existing JDBC Connection Pool, you can make a change in the Deployment Plan to reflect the exisiting Connection Pool name.

Next create a table named quotes in the database pointed to by the Connection Pool. This table should have two columns, a column named symbol to represent the stock ticker symbol, and a numeric column named price to contain the stock price.

Running the Client

Once the client program Client.java is compiled, use the following command to run the client:

jmqrun stock.Client

Make sure that the Application Server and the JMS Server have been started before running the client. This assumes that the client is running on the same machine as the JMS server.

Refer to the jBroker MQ documentation for any issues related to configuring and running JMS.

Using jBroker Console

jBroker Console is a tool that allows you to visually monitor many of the properties of the jBroker line of products (ORB, MQ, and TM), and also custom applications that implement jBroker Manageable, as is the case in the current example. It is a highly customizable allowing you specify exactly the properties you wish to monitor and how they should be arranged. jBroker Console is an alpha product and is currently not part of Novell exteNd Application Server. It can be downloaded separately from the Novell jBroker download page.

jBroker Console uses a config file as an input to tell it what properties need to be monitored and how they should be arranged. It can also generate a sample config file that contains a list of all the monitorable properties. To do this run the following command:

jbconsole -host <host> -port <port>

Here <host> and <port> are the server and port where jBroker ORB is running. It will query the ORB for a list of products and their properties and print them out. You can redirect the output to a file and modify the file as necessary to create the config file.

The config file used in this example is available for download from the ApppNotes Web site. Put the contents of the example config file in the file named stockdemo, and then run the command:

jbconsole -config stockdemo -refresh 3

The refresh parameter specifies the update frequency in seconds. You should now see the a display similar to the one shown in Figure 2.

Figure 2: The StockDemo application running in the jBroker Console.

As specified in the config file a total of 9 properties are being monitored by jBroker Console. The first row shows the number or request received by the ORB, the number of JTS committed transactions, and the price associated with the "novl" ticker symbol.

The second row shows the number of total messages, queue messages, and topic messages received by JMS. And the third row displays the number of total messages, queue messages, and topic messages sent or delivered by JMS.

If you look at the config file, you will notice that on the line corresponding to "Novl" it says "delta=NO", and for all other properties it says "delta=YES". When delta=YES is specified, jBroker Console calculates the rate at which the value of the property is changing. Hence jBroker Console calculates the rate/second for the number of ORB requests, JTS transactions, and JMS messages. Only the price associated with Novl is displayed by its absolute value.

Based on the architecture of this application, for each topic message received by JMS from the client, it delivers that message to the EJB container. The EJB container initiates a transaction and invokes the message driven bean. The message driven bean after updating the database, sends a queue message which is received by JMS and delivered back to the client. Since the client was designed to send a message approximately every second, you will notice that all these 5 properties have the same value of 1. The JMS messages properties will be equal to the sum of the Topic and Queue messages. The price associated with Novl varies based on the random values generated by the client.

Conclusion

This AppNote, utilizing a message driven bean as an example, demonstrates the use of many of the technologies of a J2EE 1.3-compatible application server. In particular, it shows the ease of creating a transaction enabled application.

* Originally published in Novell AppNotes


Disclaimer

The origin of this information may be internal or external to Novell. While Novell makes all reasonable efforts to verify this information, Novell does not make explicit or implied claims to its validity.

© Copyright Micro Focus or one of its affiliates