How to create a simple JMS topic, publisher and subscriber in Oracle 10g

Posted by Steve Racanovic | Posted in , | Posted on 2:48 PM

1

In continuing from my previous updates about jms, this time I am writing a simple file persistence jms topic.

I am still using the same version OracleAS 10.1.3.2.0 as the JMS Provider.

1. Create the JMS destination. Run the following from $OH\j2ee\home:

java -jar admin_client.jar deployer:oc4j:opmn://sracanov-au.au.oracle.com:6007/JMS oc4jadmin welcome1 -addDestination -domain topic -name jmstopic -jndiLocation jms/MyJMSTopic -persistenceFile myjmstopic.txt

This will create a presistence file “myjmstopic.txt” to store the message. You can see the file by navigating to

$OH\j2ee\<instance_name>\persistence\<instance_name>_<group_name>_1

[sracanov@sracanov-au D]$ java -jar admin_client.jar deployer:oc4j:opmn://sracanov-au.au.oracle.com:6007/JMS oc4jadmin welcome1 -addDestination -domain topic -name jmstopic -jndiLocation jms/MyJMSTopic -persistenceFile myjmstopic.txt
Command was successful

2. Create the JMS connection factory. Run the following from $OH\j2ee\home:

java -jar admin_client.jar deployer:oc4j:opmn://sracanov-au.au.oracle.com:6007/JMS oc4jadmin welcome1 -addJMSConnectionFactory -domain topic -jndiLocation jms/MyJMSTCF

[sracanov@sracanov-au D]$ java -jar admin_client.jar deployer:oc4j:opmn://sracanov-au.au.oracle.com:6007/JMS oc4jadmin welcome1 -addJMSConnectionFactory -domain topic -jndiLocation jms/MyJMSTCF

3. We need to create 3 java classes. Once again, I am using Jdev to create my classes. So first I will create a new application and project.

I will need to make sure the follow libraries are including in the project:
· J2EE
· Apache Ant

4. Now create the publisher client:

package project1;

import java.util.Hashtable;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class MyTopicPublisher {

public static void main(String[] args) {
final String topicName = "jms/MyJMSTopic";
final String topicConnectionFactoryName = "jms/MyJMSTCF";
final String oc4juser = "oc4jadmin";
final String oc4juserpassword = "welcome1";
final String urlProvider = "opmn:ormi://";
final String jmsProviderHost = "sracanov-au.au.oracle.com";
final String colon = ":";
final String opmnPort = "6007";
final String oc4jinstance = "OC4J_JMS";
final int NUMBER_OF_MESSAGES = 5;
TopicConnection topicConnection = null;
TopicConnectionFactory topicConnectionFactory = null;
Context jndiContext = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
MapMessage message = null;

/*
* Set the environment for a connection to the OC4J instance
*/
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"oracle.j2ee.rmi.RMIInitialContextFactory");
env.put(Context.SECURITY_PRINCIPAL, oc4juser);
env.put(Context.SECURITY_CREDENTIALS, oc4juserpassword);
env.put(Context.PROVIDER_URL,
urlProvider + jmsProviderHost + colon + opmnPort + colon +
oc4jinstance + "/default");

/*
* Set the Context Object.
* Lookup the Topic Connection Factory.
* Lookup the JMS Destination.
*/
try {
jndiContext = new InitialContext(env);
topicConnectionFactory =
(TopicConnectionFactory)jndiContext.lookup(topicConnectionFactoryName);
topic = (Topic)jndiContext.lookup(topicName);
} catch (NamingException e) {
System.out.println("Lookup failed: " + e.toString());
System.exit(1);
}

/*
* Create connection.
* Create session from connection.
* Create publisher.
* Create text message.
* Send messages.
* Close connection.
*/
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createMapMessage();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
message.setStringProperty("from", "from@" + jmsProviderHost);
message.setStringProperty("to", "to@" + jmsProviderHost);
message.setStringProperty("subject",
"Topic Message " + (i + 1));
message.setStringProperty("content",
"Message " + (i + 1) + " in Topic: \"" +
topicName + "\"");
System.out.println("Publishing message: " +
message.getStringProperty("from"));
System.out.println("Publishing message: " +
message.getStringProperty("to"));
System.out.println("Publishing message: " +
message.getStringProperty("subject"));
System.out.println("Publishing message: " +
message.getStringProperty("content"));
topicPublisher.publish(message);
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
System.out.println("Closing error: " + e.toString());
}
}
}
}
}

5. Now we need to create the message listener that will listen to meesages on map types:

package project1;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class JMSTopicMapListener implements MessageListener {

public void onMessage(Message message) {
MapMessage msg = null;

try {
if (message instanceof MapMessage) {
msg = (MapMessage)message;
String from = msg.getStringProperty("from");
String to = msg.getStringProperty("to");
String subject = msg.getStringProperty("subject");
String content = msg.getStringProperty("content");
System.out.println("READING MESSAGE \n=============== \nFrom: " +
from + "\nTo: " + to + "\nSubject: " +
subject + "\nContent: " + content);
} else {
System.out.println("Wrong type");
}
} catch (JMSException e) {
System.out.println("JMSException in onMessage(): " + e.toString());
} catch (Throwable t) {
System.out.println("Exception in onMessage():" + t.getMessage());
}
}
}

6. Create the subscriber client:

package project1;

import javax.jms.*;

import javax.naming.*;

import java.util.Hashtable;

public class MyTopicSubscriber {

public static void main(String[] args) {
final String topicName = "jms/MyJMSTopic";
final String topicConnectionFactoryName = "jms/MyJMSTCF";
final String oc4juser = "oc4jadmin";
final String oc4juserpassword = "welcome1";
final String urlProvider = "opmn:ormi://";
final String jmsProviderHost = "sracanov-au.au.oracle.com";
final String colon = ":";
final String opmnPort = "6007";
final String oc4jinstance = "OC4J_JMS";
Context jndiContext = null;
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
JMSTopicMapListener topicListener = null;

/*
* Set the environment for a connection to the OC4J instance
*/
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"oracle.j2ee.rmi.RMIInitialContextFactory");
env.put(Context.SECURITY_PRINCIPAL, oc4juser);
env.put(Context.SECURITY_CREDENTIALS, oc4juserpassword);
env.put(Context.PROVIDER_URL,
urlProvider + jmsProviderHost + colon + opmnPort + colon +
oc4jinstance + "/default");

/*
* Set the Context Object.
* Lookup the Topic Connection Factory.
* Lookup the JMS Destination.
*/
try {
jndiContext = new InitialContext(env);
topicConnectionFactory =
(TopicConnectionFactory)jndiContext.lookup(topicConnectionFactoryName);
topic = (Topic)jndiContext.lookup(topicName);
} catch (NamingException e) {
System.out.println("Lookup failed: " + e.toString());
System.exit(1);
}

/*
* Create connection.
* Create session from connection; false means session is
* not transacted.
* Create subscriber.
* Register message listener (TextListener).
* Receive text messages from topic.
* Close connection.
*/
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession =
topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createSubscriber(topic);
topicListener = new JMSTopicMapListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
System.out.println("Subscripted to topic: \"" + topicName + "\"");
while (true) {
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
} finally {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
System.out.println("Closing error: " + e.toString());
}
}
}
}
}

7. Ensure the follow Strings in the publisher and subcriber java class are set according to your environment:

final String topicName = "jms/MyJMSTopic";
final String topicConnectionFactoryName = "jms/MyJMSTCF";
final String oc4juser = "oc4jadmin";
final String oc4juserpassword = "welcome1";
final String urlProvider = "opmn:ormi://";
final String jmsProviderHost = "sracanov-au.au.oracle.com";
final String colon = ":";
final String opmnPort = "6007";
final String oc4jinstance = "JMS";

8. Run the subcriber class. The subscriber is waiting for message to be sent by the publisher. You will see the following message in the console.

Subscripted to topic: "jms/MyJMSTopic"


9. Run the publisher class. The following results in displayed in the publisher console:

Publishing message: from@sracanov-au.au.oracle.com
Publishing message: to@sracanov-au.au.oracle.com
Publishing message: Topic Message 1
Publishing message: Message 1 in Topic: "jms/MyJMSTopic"
Publishing message: from@sracanov-au.au.oracle.com
Publishing message: to@sracanov-au.au.oracle.com
Publishing message: Topic Message 2
Publishing message: Message 2 in Topic: "jms/MyJMSTopic"
Publishing message: from@sracanov-au.au.oracle.com
Publishing message: to@sracanov-au.au.oracle.com
Publishing message: Topic Message 3
Publishing message: Message 3 in Topic: "jms/MyJMSTopic"
Publishing message: from@sracanov-au.au.oracle.com
Publishing message: to@sracanov-au.au.oracle.com
Publishing message: Topic Message 4
Publishing message: Message 4 in Topic: "jms/MyJMSTopic"
Publishing message: from@sracanov-au.au.oracle.com
Publishing message: to@sracanov-au.au.oracle.com
Publishing message: Topic Message 5
Publishing message: Message 5 in Topic: "jms/MyJMSTopic"
Process exited with exit code 0.

10. Back in the subscriber console, the following message is displayed:

READING MESSAGE
===============
From: from@sracanov-au.au.oracle.com
To: to@sracanov-au.au.oracle.com
Subject: Topic Message 1
Content: Message 1 in Topic: "jms/MyJMSTopic"
READING MESSAGE
===============
From: from@sracanov-au.au.oracle.com
To: to@sracanov-au.au.oracle.com
Subject: Topic Message 2
Content: Message 2 in Topic: "jms/MyJMSTopic"
READING MESSAGE
===============
From: from@sracanov-au.au.oracle.com
To: to@sracanov-au.au.oracle.com
Subject: Topic Message 3
Content: Message 3 in Topic: "jms/MyJMSTopic"
READING MESSAGE
===============
From: from@sracanov-au.au.oracle.com
To: to@sracanov-au.au.oracle.com
Subject: Topic Message 4
Content: Message 4 in Topic: "jms/MyJMSTopic"
READING MESSAGE
===============
From: from@sracanov-au.au.oracle.com
To: to@sracanov-au.au.oracle.com
Subject: Topic Message 5
Content: Message 5 in Topic: "jms/MyJMSTopic"

Ensure you stop the subscriber client when done.

How to Send JMS Messages to one Oracle Application Server while reading it from another.

Posted by Steve Racanovic | Posted in , | Posted on 2:01 PM

2

The example demonstrates a queue on one server (Server A), while having an MDB on another server (Server B) that listens to the queue. It also includes a client that connects to Server B and reads the messages.

The example involves having 2 Oracle Application Server. I'm using Oracle AS 10.1.3.2.0.

The following outline the tasks to be complete:
a) Server A is used to send the message to.
Need to create a JMS destination and queue connection factory
The queue on Server A will be called jms/MyJMSQueue.
The queue connection factory Server A will be called jms/MyJMSQCF.
The client will connect to server A and send a message to the queue.

b) Server B is used to read the messages.
Need to create a queue connection factory destination that point to Server A and the JMS destination as in Server A.
The queue on Server B is the same as Server A.
The queue connection factory Server B will be called jms/MyRemoteQCF.
The MDB will be deployed to Server B. The MDB will listen to messages on Server A.
The client will connect to server B and read messages from the queue.

1. On Server A, create a JMS Destination.

[sracanov@sracanov-au2 JMS_SR]$ cd $ORACLE_HOME\j2ee\JMS

[sracanov@sracanov-au2]$ java -jar admin_client.jar deployer:oc4j:opmn://sracanov-au2.au.oracle.com:6005/JMS_SR oc4jadmin welcome1 -addDestination -domain queue -name jmsqueue -jndiLocation jms/MyJMSQueue

2. On Server B, create a JMS Destination. Same as in step 1.

[sracanov@sracanov-au2]$ java -jar admin_client.jar deployer:oc4j:opmn://sracanov-au.au.oracle.com:6007/JMS oc4jadmin welcome1 -addDestination -domain queue -name jmsqueue -jndiLocation jms/MyJMSQueue

3. On Server A, create a Connection Factory.

[sracanov@sracanov-au2 JMS_SR]$ java -jar admin_client.jar deployer:oc4j:opmn:// sracanov-au2.au.oracle.com:6005/JMS_SR oc4jadmin welcome1 -addJMSConnectionFactory -domain queue -jndiLocation jms/MyJMSQCF

4. On Server B, create a Connection Factory.

First determine the jms port for the OC4J instance you are using on Server A.

[oracle@sracanov-au2 JMS_SR]$ opmnctl status -l


OC4JGroup:default_group | OC4J:JMS_SR | 11196 | Alive | 857355163 | 253252 | 23:27:49 | jms:12604,ajp:12504,rmis:12704,rmi:12404


The instance in the example is using port 12604.

On Server B, run the following:

[sracanov@sracanov-au D]$ java -jar admin_client.jar deployer:oc4j:opmn:// sracanov-au.au.oracle.com:6007/JMS oc4jadmin welcome1 -addJMSConnectionFactory -jndiLocation jms/MyRemoteQCF -domain queue -host sracanov-au2.au.oracle.com -username oc4jadmin -password welcome1 -port 12604

5. Create the QueueProducer.

Use the example code from the previous blog to create the QueueProducer class. The QueueProducer will connection to Server A and send messages to the queue. You will need to make the following changes to the class.


String queueName = "jms/MyJMSQueue";
String queueConnectionFactoryName = "jms/MyJMSQCF";

And

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, “oracle.j2ee.rmi.RMIInitialContextFactory");
env.put(Context.SECURITY_PRINCIPAL, "oc4jadmin");
env.put(Context.SECURITY_CREDENTIALS, "welcome1");
env.put(Context.PROVIDER_URL, "opmn:ormi://sracanov-au2.au.oracle.com:6005:JMS_SR/default");


Ensure you have set this to your Server A details, including opmn port, and oc4j instance.Use ‘opmnctl status –port’ to find your opmn port.

6) Run the QueueProducer class to send the message to the queue.

The is result:
Producing message: Message 1
Producing message: Message 2
Producing message: Message 3
Producing message: Message 4
Producing message: Message 5

7) From the Oracle EM, select your instance (in this example, ‘JMS_SR’) -> Administration -> JMS Destinations -> Memory Performance (on ‘jmsqueue’ in this example). Here you will see you messages in the queue.



8) Create the QueueConsumer.

Use the example code from the previous blog to create the QueueConsumer class. The QueueConsumer will connection to Server B and read messages from the queue on Server A. You will need to make the following changes to the class.


String queueName = "jms/MyJMSQueue";
String queueConnectionFactoryName = " jms/MyRemoteQCF";

And

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, “oracle.j2ee.rmi.RMIInitialContextFactory");
env.put(Context.SECURITY_PRINCIPAL, "oc4jadmin");
env.put(Context.SECURITY_CREDENTIALS, "welcome1");
env.put(Context.PROVIDER_URL, opmn:ormi:// sracanov-au.au.oracle.com:6007:JMS/default");


Ensure you have set this to your Server B details, including opmn port, and oc4j instance.

9) Now run the QueueConsumer class that will connect to Server B and receive the messages from the queue on Server A.

This is the result:

Consuming message: Message 1
Consuming message: Message 2
Consuming message: Message 3
Consuming message: Message 4
Consuming message: Message 5

10) Create an MBD and deploy to Server B that will listen and print the messages in the queue on Server A.

a.Create the MessageDrivenEJBBean as follows:


package project3;

import javax.ejb.EJBException;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MessageDrivenEJBBean implements MessageDrivenBean,
MessageListener {
private MessageDrivenContext _context;

public void ejbCreate() {
}

public void setMessageDrivenContext(MessageDrivenContext context) throws EJBException {
_context = context;
}

public void ejbRemove() throws EJBException {
}

public void onMessage(Message message) {
TextMessage msg = null;

try {
if (message instanceof TextMessage) {
msg = (TextMessage)message;
System.out.println("Reading message: " + msg.getText());
} else {
System.out.println("Wrong Message type");
}
} catch (JMSException e) {
System.out.println("JMSException in onMessage(): " + e.toString());
} catch (Throwable t) {
System.out.println("Exception in onMessage():" + t.getMessage());
}
}
}


b.Edit the ejb-jar.xml as follows:

<?xml version = '1.0' encoding = 'windows-1252'?>
<ejb-jar xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
http://java.sun.com/xml/ns/j2ee/ejb-jar_2_1.xsd" version="2.1"
xmlns="http://java.sun.com/xml/ns/j2ee">
<enterprise-beans>
<message-driven>
<description>Message Driven Bean</description>
<display-name>MessageDrivenEJB</display-name>
<ejb-name>MessageDrivenEJB</ejb-name>
<ejb-class>project3.MessageDrivenEJBBean</ejb-class>
<messaging-type>javax.jms.MessageListener</messaging-type>
<transaction-type>Container
<message-destination-type>javax.jms.Queue</message-destination-type>
<activation-config>
<activation-config-property>
<activation-config-property-name>destinationType</activation-config-property-name>
<activation-config-property-value>javax.jms.Queue</activation-config-property-value>
</activation-config-property>
<activation-config-property>
<activation-config-property-name>acknowledgeMode</activation-config-property-name>
<activation-config-property-value>Auto-acknowledge</activation-config-property-value>
</activation-config-property>
<activation-config-property>
<activation-config-property-name>subscriptionDurability</activation-config-property-name>
<activation-config-property-value>NonDurable</activation-config-property-value>
</activation-config-property>
</activation-config>
</message-driven>
</enterprise-beans>
</ejb-jar>

c.Edit the orion-ejb-jar.xml as follows:

<?xml version = '1.0' encoding = 'windows-1252'?>
<orion-ejb-jar xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://xmlns.oracle.com/oracleas/schema/orion-ejb-jar-10_0.xsd"
schema-major-version="10" schema-minor-version="0">
<enterprise-beans>
<message-driven-deployment name="MessageDrivenEJB"
connection-factory-location="jms/MyRemoteQCF"
destination-location="jms/MyJMSQueue"/>
</enterprise-beans>
<assembly-descriptor>
<default-method-access>
<security-role-mapping/>
</default-method-access>
</assembly-descriptor>
</orion-ejb-jar>

d.Deploy MDB to Server B.

e.Run the QueueProduce to send some messages to the queue.

f.Open the file $ORACLE_HOME\opmn\log\default_group~~default_group~1.log

To view the result:

07/10/02 08:52:24 Reading message: Message 1
07/10/02 08:52:24 Reading message: Message 2
07/10/02 08:52:24 Reading message: Message 3
07/10/02 08:52:24 Reading message: Message 4
07/10/02 08:52:24 Reading message: Message 5
07/10/02 08:52:24 Wrong Message type