} } }

    JMS临盆者+单线程发送-我们到底能走多远系列(29)

    添加时间:2013-7-10 点击量:

    我们到底能走多远系列(29)


    扯淡:


      然后我俩各自一端/望着大河弯弯/终于敢放胆/嘻皮笑脸/面对/人生的难”      --- 《山丘》


      迎着风/迎向远方的天空/路上也有艰苦/也有那摆脱/都走得安闲”                    --- 《与你到永远


      “赶上冷风雨休太卖力/自负满心里休理会嘲笑与质问/笑骂由人潇洒地做人/少年人潇洒地做人/持续行潇洒地做人”      ---《沉默是金》


      


     主题:


      应用JMS将共通模块分别出去,比如发模块,可以在长途的机械上跑customer,然后各个应用应用发功能是只要向长途机械发送msg即可。


      类似于下图:



      对于图中的Producer的实现都差不久不多,主如果选择什么样的Jms第三方实现。对于Customer我们不必关怀.


      比如下面的代码是HornetQ的Producer的样例代码:



    public class JmsProducer implements ExceptionListener,FailureListener{
    

    private final Logger logger = LoggerFactory.getLogger(JmsProducer.class);
    private String queueName;
    private String jmsHost;
    private int jmsPort;
    private ConnectionFactory cf;
    private Queue queue;
    private Connection queueConnection;
    private Session queueSession;
    private MessageProducer queueProducer;

    public void init() throws Exception {
    queue
    = HornetQJMSClient.createQueue(queueName);
    Map
    <String, Object> connectionParams = new HashMap<String, Object>();
    connectionParams.put(TransportConstants.PORT_PROP_NAME, jmsPort);
    connectionParams.put(TransportConstants.HOST_PROP_NAME, jmsHost);
    TransportConfiguration transportConfiguration
    = new TransportConfiguration(NettyConnectorFactory.class.getName(),
    connectionParams);
    HornetQConnectionFactory hornetQConnectionFactory
    = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
    hornetQConnectionFactory.setClientFailureCheckPeriod(
    60000);
    hornetQConnectionFactory.setRetryInterval(
    2000); // 2 seconds for first retry
    hornetQConnectionFactory.setRetryIntervalMultiplier(1.5); // 1.5 times loner betrween retrys
    hornetQConnectionFactory.setMaxRetryInterval(20000); // Wait max 20 secs between retrys
    hornetQConnectionFactory.setReconnectAttempts(-1); // Retry forever
    hornetQConnectionFactory.setConnectionTTL(60000); //The default value for connection ttl is 60000ms
    hornetQConnectionFactory.setClientFailureCheckPeriod(30000);//The default value for client failure check period is 30000ms
    cf = (ConnectionFactory)hornetQConnectionFactory;
    queueConnection
    = cf.createConnection();
    queueSession
    = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    queueProducer
    = queueSession.createProducer(queue);
    queueProducer.setTimeToLive(
    6000000);//100分钟失效
    queueProducer.setDisableMessageID(true);//封闭消息id
    queueProducer.setDisableMessageTimestamp(true);//封闭消息的时候戳
    logger.info(init JmsProducer of +queueName);
    //queueConnection.start();
    }

    public void reConnect(){
    logger.info(queueName+ reConnect);
    }

    public void destroy() throws Exception {
    logger.info(
    destroy JmsProducer of +queueName);
    if(queueSession != null){
    queueSession.close();
    queueSession
    = null;
    }
    if(queueConnection != null){
    queueConnection.close();
    queueConnection
    = null;
    }
    }

    public String getQueueName() {
    return queueName;
    }

    public void setQueueName(String queueName) {
    this.queueName = queueName;
    }

    public String getJmsHost() {
    return jmsHost;
    }

    public void setJmsHost(String jmsHost) {
    this.jmsHost = jmsHost;
    }

    public int getJmsPort() {
    return jmsPort;
    }

    public void setJmsPort(int jmsPort) {
    this.jmsPort = jmsPort;
    }

    public Session getQueueSession() {
    return queueSession;
    }

    public void sendTextMessage(final TextMessage textMessage) throws JMSException {
    try {
    queueProducer.send(textMessage);
    }
    catch (Exception e) {
    // TODO: handle exception
    logger.error(on sendTextMessage Exception=+e.getMessage());
    }
    }

    public void onException(JMSException jmsex) {
    // TODO Auto-generated method stub
    logger.warn(on JmsProducer Exception=+jmsex.getMessage());
    }

    public void connectionFailed(HornetQException hqex, boolean arg1) {
    // TODO Auto-generated method stub
    logger.error(on JmsProducer connectionFailed,arg1=+arg1+,Exception=+hqex.getMessage());
    }
    }



       一般性的,我们哄骗Spring 把这个JmsProducer 注入进本身的营业类里去应用即可:


      spring的bean设备:



        <bean id=jmsCodeProducer class=com.sz.lvban.biz.bo.util.JmsProducer init-method=init>
    
    <property name=jmsHost value=¥{jms.send.code.host} />
    <property name=jmsPort value=¥{jms.send.code.port} />
    <property name=queueName value=¥{jms.send.code.queueName} />
    </bean>



        @Autowired
    
    private JmsProducer jmsCodeProducer;


      某办法直接调用:



                    textMsg = jmsCodeProducer.getQueueSession().createTextMessage();
    
    textMsg.setText(smsJson.toJString());
    jmsCodeProducer.sendTextMessage(textMsg);


      如许就实现了让办事器上的customer干活的工作了。



    然后,我们发明spring默认注入jmsCodeProducer应用了单例的模式,如许一来我们就可能推敲多线程调用冲突的题目。然而我们不克不及避免jmsCodeProducer的单例,毕竟?成果init-method=init 的init办法有点消费的。


    所以就搞了下面的规划:(上图的题项目组)



    应用一个queue做中心站,只要包管单线程从queue中取数据,就能实现一条条向长途办事器发送jms消息。


    下面是一个实现的例子:


    我们先设备一个ApnsMsgSender,有他来把握queue的行动,包含插入,取出数据,以及发送jms消息。



        <bean id=ApnsMsgSender class=com.sz.wxassistant.biz.bo.util.ApnsMsgSender init-method=sendMsg>
    
    <property name=jmsApnsProducer ref=jmsApnsProducer></property>
    </bean>


    重视init-method=sendMsg 启动时,我们就须要启动一个线程来监控queue。


    连络下代码:在这里我们应用了LinkedBlockingQueue,关于ArrayBlockingQueue和LinkedBlockingQueue之间的弃取,我没有实际测试过。



    public class ApnsMsgSender {
    

    // private ArrayBlockingQueue<TextMessage> queue = new
    // ArrayBlockingQueue<TextMessage>(1024);
    private LinkedBlockingQueue<TextMessage> jmsQueue = new LinkedBlockingQueue<TextMessage>();
    private JmsProducer jmsApnsProducer;
    private utorService pool;
    private Logger log = LoggerFactory.getLogger(ApnsMsgSender);

    /
    启动进口
    /
    public void sendMsg() {
    pool
    = utors.newCachedThreadPool(new MyThreadFactory());
    pool.submit(
    new JmsSender());
    }

    public boolean addJms(TextMessage msg) {
    return jmsQueue.offer(msg);
    }

    public TextMessage getMsg() {
    TextMessage msg
    = null;
    try {
    // 取msg 10秒超时设置
    msg = jmsQueue.poll(10, TimeUnit.SECONDS);
    }
    catch (InterruptedException interuptedE) {
    log.warn(
    poll jms error + interuptedE);
    }
    catch (Exception e) {
    log.error(
    poll jms get unknown error: , e);
    }
    return msg;
    }

    public JmsProducer getJmsApnsProducer() {
    return jmsApnsProducer;
    }

    public void setJmsApnsProducer(JmsProducer jmsApnsProducer) {
    this.jmsApnsProducer = jmsApnsProducer;
    }

    public TextMessage genTextMessage() throws JMSException {
    return jmsApnsProducer.getQueueSession().createTextMessage();
    }

    private class JmsSender implements Runnable {
    public void run() {
    whiletrue) {
    try {
    // 从queue中取msg
    TextMessage msg = getMsg();
    if (msg != null && msg instanceof TextMessage) {
    // 发送
    jmsApnsProducer.sendTextMessage(msg);
    }
    }
    catch (JMSException jmsE) {
    log.error(
    send jms error: + jmsE);
    }
    catch (Exception e) {
    log.error(
    get unknown error: , e);
    }
    }
    }
    }

    class MyThreadFactory implements ThreadFactory {
    public Thread newThread(Runnable r) {
    Thread thread
    = new Thread(r);
    // 线程设置为后台过程
    thread.setDaemon(true);
    thread.setName(
    ApnsMsgSender);
    return thread;
    }
    }
    }


    申明:


      我们的这个线程做了什么?


        1,  getMsg()


        2,  sendTextMessage(msg)


      很了然的实现....


    注:代码中还应用了ThreadFactory 来封装了一下线程。



      外界代码调用怎么搞?


        1,addJms 就可以了 


    只负责向queue里放,如许再多的线程都没有关系了。




    TextMessage msg = apnsMsgSender.genTextMessage();
    
    msg.setText(
    I love los angeles !);
    apnsMsgSender.addJms(msg);




    ok。到这里就实现了单线程发送jms消息的功能。





    让我们持续前行


    ----------------------------------------------------------------------


    尽力不必然成功,但不尽力必然不会成功。
    共勉。


    我所有的自负皆来自我的自卑,所有的英雄气概都来自于我的软弱。嘴里振振有词是因为心里满是怀疑,深情是因为痛恨自己无情。这世界没有一件事情是虚空而生的,站在光里,背后就会有阴影,这深夜里一片寂静,是因为你还没有听见声音。—— 马良《坦白书》
    分享到: