Tuesday, September 7, 2010
JMSPublisher Java Code
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.NamingException;
import progress.message.jclient.TopicConnectionFactory;
public class JMSPublisher
{
/** Declare all the variables */
private final String initialcontextfactory = "com.sonicsw.jndi.mfcontext.MFContextFactory";
private String commaseparatedurl = "tcp://localhost:2506";
private final String USERNAME = "Administrator";
private final String PASSWORD = "Administrator";
private String connectionTimeout = "240000";// As per requirement
private final String connectionFactory = "TCF";
private final String topicName = "Sample.T08";
private final int MESSAGE_LIFESPAN = 86400000; // As per requirement
private int NUMBERS_Of_RETRY_FOR_CONTEXT = 10; // As per requirement
private long RETRY_TIME_FOR_CONNECTION = 20000;// As per requirement
private int NUMBERS_Of_RETRY_FOR_CONNECTION = 20;// As per requirement
private progress.message.jclient.TopicConnectionFactory topicConnectionFactory = null;
private TopicConnection connection = null;
private TopicSession topicSession = null;
private Topic pubTopic = null;
private MessageProducer publisher = null;
/**
*
* @param args
*/
public static void main(String args[])
{
new JMSPublisher();
}
public JMSPublisher()
{
try
{
topicConnectionFactory = getConnectionFactory();
if (topicConnectionFactory != null)
{
connection = getConnection(topicConnectionFactory);
if (connection == null)
{
connection = retryConnection(topicConnectionFactory);
if (connection == null)
{
System.out.println("All attempts for creating Connection exhausted");
System.out.println("Exiting the application");
System.exit(-1);
}
}
topicSession = getSession(connection);
if (topicSession != null)
{
pubTopic = getTopic(topicSession);
if (pubTopic != null)
{
publisher = getPublisher(topicSession, pubTopic);
connection.start();
if (publisher != null)
{
long count = 0;
while (true)
{
try
{
System.out.println("Creating message.....");
javax.jms.TextMessage msg = topicSession.createTextMessage();
msg.setText("Testing.........");
msg.setJMSMessageID("TestID:XX1");
msg.setJMSCorrelationID("TestingBCP");
System.out.println("Message created");
publisher.send(msg, javax.jms.DeliveryMode.PERSISTENT, // Persistent
javax.jms.Message.DEFAULT_PRIORITY, // Priority
MESSAGE_LIFESPAN);
count++;
System.out.println("Message send [ Count=" + count + " ]");
}
catch (JMSException e)
{
if (topicConnectionFactory == null)
{
topicConnectionFactory = getConnectionFactory();
}
if (topicConnectionFactory != null)
{
connection = getConnection(topicConnectionFactory);
if (connection == null)
{
connection = retryConnection(topicConnectionFactory);
if (connection == null)
{
System.out.println("All attempts for creating Connection exhausted");
System.out.println("Exiting the application");
System.exit(-1);
}
}
topicSession = getSession(connection);
if (topicSession != null)
{
pubTopic = getTopic(topicSession);
if (pubTopic != null)
{
publisher = getPublisher(topicSession, pubTopic);
if (publisher != null)
{
connection.start();
System.out.println("Connection Started.");
}
else
{
System.out.println("Unable to cretae MessageProducer");
System.out.println("Exiting the application");
System.exit(-1);
}
}
else
{
System.out.println("Unable to cretae Topic");
System.out.println("Exiting the application");
System.exit(-1);
}
}
else
{
System.out.println("Unable to cretae TopicSession");
System.out.println("Exiting the application");
System.exit(-1);
}
}
}
}
}
else
{
System.out.println("Unable to cretae MessageProducer");
System.out.println("Exiting the application");
System.exit(-1);
}
}
else
{
System.out.println("Unable to cretae Topic");
System.out.println("Exiting the application");
System.exit(-1);
}
}
else
{
System.out.println("Unable to cretae TopicSession");
System.out.println("Exiting the application");
System.exit(-1);
}
}
else
{
System.out.println("All attempts to initialze context exhausted");
System.out.println("Exiting the application");
System.exit(-1);
}
}
catch (Exception ex)
{
ex.printStackTrace();
System.out.println("Exiting the application");
System.exit(-1);
}
finally
{
if(publisher != null)
{
try {
publisher.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(topicSession != null)
{
try {
topicSession.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(connection != null)
{
try
{
connection.stop();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
*
* @param topicConnectionFactory
* @return Connection
*/
private TopicConnection retryConnection(TopicConnectionFactory topicConnectionFactory)
{
TopicConnection con = null;
for (int i = 0; i < NUMBERS_Of_RETRY_FOR_CONNECTION; i++)
{
if (i == 0)
System.out.println("Application is in retrying mode to create connection");
System.out.println("Will retry after " + RETRY_TIME_FOR_CONNECTION / 1000 + " seconds");
try
{
Thread.sleep(RETRY_TIME_FOR_CONNECTION);
con = getConnection(topicConnectionFactory);
if (con != null)
break;
}
catch (InterruptedException e1)
{
e1.printStackTrace();
}
}
return con;
}
/**
*
* @return TopicConnectionFactory
*/
private progress.message.jclient.TopicConnectionFactory getConnectionFactory()
{
System.out.println("Creating Context");
Context ctx = null;
topicConnectionFactory = null;
for (int i = 0; i <= NUMBERS_Of_RETRY_FOR_CONTEXT; i++)
{
try
{
/** Connect to DS and save the initial context */
Properties env = new Properties();
env.setProperty(javax.naming.Context.INITIAL_CONTEXT_FACTORY,initialcontextfactory); //MFContextFactory
env.setProperty(javax.naming.Context.PROVIDER_URL, commaseparatedurl); //Comma separated url of primary and backup servers
env.setProperty(javax.naming.Context.SECURITY_PRINCIPAL, USERNAME); //Authentication
env.setProperty(javax.naming.Context.SECURITY_CREDENTIALS, PASSWORD); //Authentication
env.put("com.sonicsw.jndi.mfcontext.connectTimeout", connectionTimeout); // Appln will try to reconnect to DS for this much amount of time, else will throw exception.
ctx = new javax.naming.directory.InitialDirContext(env); // Save the initial context
System.out.println("Context Created.");
System.out.println("Creating Connection Factory.");
topicConnectionFactory = (TopicConnectionFactory) ctx.lookup(connectionFactory); // Create CONNECTIONFACTORY
System.out.println("Connection Factory Created.");
saveConnectionFactoryObj(topicConnectionFactory); // Save the factory object created successfully
if (topicConnectionFactory != null)
break;
}
catch (Exception aa)
{
ctx = null;
topicConnectionFactory = null;
System.out.println("Exception connecting to DS, using saved connection factory object to connect");
topicConnectionFactory = getConnectionFactoryObjFromDisk();
if(topicConnectionFactory!=null)
break;
}
}
return topicConnectionFactory;
}
/**
*
* @param topicConnectionFactory
*/
private void saveConnectionFactoryObj(TopicConnectionFactory topicConnectionFactory)
{
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try
{
if (topicConnectionFactory != null)
{
// Write connection topicConnectionFactory object to a file for
// later use
fos = new FileOutputStream("connection_factory.sjo");
oos = new ObjectOutputStream(fos);
oos.writeObject(topicConnectionFactory);
System.out.println("Saved Connection Factory Object");
}
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
try
{
if (oos != null)
oos.close();
if (fos != null)
fos.close();
}
catch (IOException e1)
{
e1.printStackTrace();
}
}
}
/**
*
* @return TopicConnectionFactory
*/
private TopicConnectionFactory getConnectionFactoryObjFromDisk()
{
ObjectInputStream ois = null;
FileInputStream fis = null;
TopicConnectionFactory topicConnectionFactory = null;
try
{
System.out.println("Attempting to read JMS Connection topicConnectionFactory from file...");
// Try to read the object from a file
fis = new FileInputStream("connection_factory.sjo");
ois = new ObjectInputStream(fis);
topicConnectionFactory = (progress.message.jclient.TopicConnectionFactory) ois.readObject();
System.out.println("JMS Connection topicConnectionFactory read from file!");
}
catch (Exception e)
{
topicConnectionFactory = null;
e.printStackTrace();
}
finally
{
try
{
if (ois != null)
ois.close();
if (fis != null)
fis.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
return topicConnectionFactory;
}
/**
*
* @param topicConnectionFactory
* @return TopicConnection
*/
private TopicConnection getConnection(TopicConnectionFactory topicConnectionFactory)
{
TopicConnection connection = null;
try
{
System.out.println("Creating Connection");
topicConnectionFactory.setConnectID("ConnectId-Pub");
connection = topicConnectionFactory.createTopicConnection(USERNAME, PASSWORD);
System.out.println("Connection Created.");
}
catch (Exception cc)
{
System.out.println("Unable to create connection");
}
return connection;
}
/**
*
* @param connection
* @return TopicSession
*/
private TopicSession getSession(TopicConnection connection)
{
TopicSession topicSession = null;
try
{
System.out.println("Creating Session");
topicSession = connection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
System.out.println("Session Created.");
}
catch (Exception e)
{
e.printStackTrace();
}
return topicSession;
}
/**
*
* @param topicSession
* @return Topic
*/
private Topic getTopic(TopicSession topicSession)
{
Topic pubTopic = null;
try
{
System.out.println("Creating Topic");
pubTopic = topicSession.createTopic(topicName);
System.out.println("Publisher Topic Created.");
}
catch (Exception e)
{
e.printStackTrace();
}
return pubTopic;
}
/**
*
* @param topicSession
* @param pubTopic
* @return MessageProducer
*/
private MessageProducer getPublisher(TopicSession topicSession, Topic pubTopic)
{
MessageProducer publisher = null;
try
{
System.out.println("Creating Publisher");
publisher = topicSession.createProducer(pubTopic);
System.out.println("Publisher Created.");
}
catch (Exception e)
{
e.printStackTrace();
}
return publisher;
}
}
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment