Monday, April 30, 2012

ActiveMQ: How to Start/Stop Camel Routes on an ActiveMQ Slave

Do you have a business requirement in which you need ActiveMQ to deploy your Camel routes but you have come to realize that in a Master/Slave configuration the Camel Context is always started on the slave broker?

In this example I will show you how you can configure ActiveMQ to deploy Camel routes as well as how to control when these routes should be started.  In this example we have a master broker with routes that start when the broker is started.  Additionally, we will have a slave broker which will have routes that we only want to start when the slave becomes the master.

I am currently using the apache-activemq-5.5.1-fuse-04-01, which is the latest release of ActiveMQ from FuseSource at the time of this writing.  You can grab the binaries/source from the following link: apache-activemq-5.5.1-fuse-04-01

So you may be wondering how you might be able to accomplish this, right?  Well luckily, we can easily have something working with just a little code and a little configuration.

Code

The code we need to implement is rather simple.  We just need to create a class that implements the ActiveMQ Service interface.  Below is the simple example I created to demonstrate how this works:

 package com.fusesource.example;  
 import org.apache.activemq.Service;  
 import org.apache.camel.spring.SpringCamelContext;  
 import org.slf4j.Logger;  
 import org.slf4j.LoggerFactory;  
 /**  
  * Example used to start and stop the camel context using the ActiveMQ Service interface  
  *  
  */  
 public class CamelContextService implements Service  
 {  
   private final Logger LOG = LoggerFactory.getLogger(CamelContextService.class);  
   SpringCamelContext camel;  
   @Override  
   public void start() throws Exception {  
     try {  
       camel.start();  
     } catch (Exception e) {  
       LOG.error("Unable to start camel context: " + camel);  
       e.printStackTrace();  
     }  
   }  
   @Override  
   public void stop() throws Exception {  
     try {  
       camel.stop();  
     } catch (Exception e) {  
       LOG.error("Unable to stop camel context: " + camel);  
       e.printStackTrace();  
     }  
   }  
   public SpringCamelContext getCamel() {  
     return camel;  
   }  
   public void setCamel(SpringCamelContext camel) {  
     this.camel = camel;  
   }  
 }  

The magic behind all this is in the Service interface.  When this class is registered as a service with the broker, the start method will be called when the broker is fully initialized.  Remember to copy the jar file to the lib directory of the broker where you want this code to be invoked.

Configuration

First let's have a look at how we deploy Camel routes from ActiveMQ's broker configuration file.  In the installation directory of the broker you will find the conf directory which holds multiple examples of different broker configuration files and such.  One such file is called camel.xml which defines a simple route.  In our example we will import this file in our broker's activemq.xml as follows which will start the camel context and associated route.

 <import resource="camel.xml"/>  

This should be added just after the ending broker element where you will see that the configuration is already importing jetty.xml.

Now that we have added a Camel route to the master broker it can be started.  Once started, you should see that the Camel Context was picked up and one route was started:

  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camel) is starting  
  INFO | JMX enabled. Using ManagedManagementStrategy.  
  INFO | Found 3 packages with 14 @Converter classes to load  
  INFO | Loaded 163 core type converters (total 163 type converters)  
  INFO | Loaded 3 @Converter classes  
  INFO | Loaded additional 4 type converters (total 167 type converters) in 0.006 seconds  
  WARN | Broker localhost not started so using broker1 instead  
  INFO | Connector vm://localhost Started  
  INFO | Route: route1 started and consuming from: Endpoint[activemq://example.A]  
  INFO | Total 1 routes, of which 1 is started.  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camel) started in 0.532 seconds  

So now that we have the master broker up and running with a Camel route deployed we are going to do the same to the slave broker, but this time we are going to edit the camel.xml slightly as follows to set the Camel Context id to camelBackup and most importantly we are going add an attribute autoStartup and set it to false to prevent the route from starting when the Camel Context is discovered:

 <camelContext id="camelBackup" xmlns="http://camel.apache.org/schema/spring" autoStartup="false">  

One last thing we need to do to the slave broker is configure the new service we created from the above code.  Copy the following configuration to your slave broker's activemq.xml:

 <services>  
   <bean xmlns="http://www.springframework.org/schema/beans" class="com.fusesource.example.CamelContextService">  
      <property name="camel" ref="camelBackup"/>  
   </bean>  
 </services>  

From the above configuration and sample code, you can see Spring is being used to inject the camel property into our Service class.  Additionally, notice the ref has been set to camelBackup which is the id we used for the CamelContext in the slave's camel.xml file.

Additionally, the broker has been configured as a slave so the broker will only be fully initialized when the master broker fails.  If you want more information on configuring ActiveMQ Master/Slave brokers, take at look at one of my early posts on Master/Slave Broker Configuration.

If you haven't done it all ready, copy the jar that was created from packaging up the code from the example above to the slave broker's lib directory.

Note:  In a production system you might want to configure this on both the master and slave broker to keep the configurations mirrored, as the routes will be started on the master as well once it is fully initialized. In this post I am keeping the required configuration to the slave broker just to demonstrate the behavior.

Test Run

Now that we have the code and configuration done, let's give this a test run by starting up the slave broker.

  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) is starting  
  INFO | JMX enabled. Using ManagedManagementStrategy.  
  INFO | Found 3 packages with 14 @Converter classes to load  
  INFO | Loaded 163 core type converters (total 163 type converters)  
  INFO | Loaded 3 @Converter classes  
  INFO | Loaded additional 4 type converters (total 167 type converters) in 0.007 seconds  
  INFO | Total 1 routes, of which 0 is started.  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) started in 0.512 seconds  

Looking at the output from the slave broker you can see the CamelContext is still started, however the route is not (remember we set autoStartup="false).  Now, in the terminal where the master broker is running, issue a kill to stop the broker.

If you have a look at the slave broker's output again, you can see the connector was started, openwire in this case, and the Camel route is now started.

  ERROR | Network connection between vm://broker2#0 and tcp://localhost/127.0.0.1:61616 shutdown: null  
 java.io.EOFException  
      at java.io.DataInputStream.readInt(DataInputStream.java:375)  
      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275)  
      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:222)  
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)  
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:197)  
      at java.lang.Thread.run(Thread.java:680)  
  WARN | Master Failed - starting all connectors  
  INFO | Listening for connections at: tcp://macbookpro-251a.home:62616  
  INFO | Connector openwire Started  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) is starting  
  WARN | Broker localhost not started so using broker2 instead  
  INFO | Connector vm://localhost Started  
  INFO | Route: route1 started and consuming from: Endpoint[activemq://example.A]  
  INFO | Total 1 routes, of which 1 is started.  
  INFO | Apache Camel 2.8.0-fuse-04-01 (CamelContext: camelBackup) started in 0.036 seconds  

Conclusion

That's all there is to it, and I think you would agree starting a route from ActiveMQ slave is rather simple and easy to implement.  I'd also like to thank Hiram Chirino, an Apache ActiveMQ Founder, for pointing me in the direction of using the ActiveMQ Service Interface.

Friday, April 27, 2012

Camel: Working with Email Attachments

If your using the Camel-Mail component to handle some business logic that involves receiving email that contains attachments, then you might be interested in how these email attachments can be split into separate messages so they can be processed individually.  This post will demonstrate how this can be done using a Camel Expression and a JUnit test that demonstrates this behavior.

Recently, Claus Isben, an Apache Camel committer added some new documentation on the Apache Camel Mail component page that creates an Expression to split each attachment in an exchange into a separate message.  In addition he has included this code in Camel 2.10 and it is available as org.apache.camel.component.mail.SplitAttachmentExpression.  This class is using the ExpressionAdapter class which in Camel 2.9 is available as org.apache.camel.support.ExpressionAdpater and for Camel 2.8 and earlier is available as org.apache.camel.impl.ExpressionAdapter.

Let's have a look at the SplitAttachmentExpression:
 /**  
  * Licensed to the Apache Software Foundation (ASF) under one or more  
  * contributor license agreements. See the NOTICE file distributed with  
  * this work for additional information regarding copyright ownership.  
  * The ASF licenses this file to You under the Apache License, Version 2.0  
  * (the "License"); you may not use this file except in compliance with  
  * the License. You may obtain a copy of the License at  
  *  
  *   http://www.apache.org/licenses/LICENSE-2.0  
  *  
  * Unless required by applicable law or agreed to in writing, software  
  * distributed under the License is distributed on an "AS IS" BASIS,  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
  * See the License for the specific language governing permissions and  
  * limitations under the License.  
  */  
 package org.apache.camel.component.mail;  
 import java.util.ArrayList;  
 import java.util.List;  
 import java.util.Map;  
 import javax.activation.DataHandler;  
 import org.apache.camel.Exchange;  
 import org.apache.camel.Message;  
 import org.apache.camel.support.ExpressionAdapter;  
 /**  
  * A {@link org.apache.camel.Expression} which can be used to split a {@link MailMessage}  
  * per attachment. For example if a mail message has 5 attachments, then this  
  * expression will return a List<Message> that contains 5 {@link Message}  
  * and each have a single attachment from the source {@link MailMessage}.  
  */  
 public class SplitAttachmentsExpression extends ExpressionAdapter {  
   @Override  
   public Object evaluate(Exchange exchange) {  
     // must use getAttachments to ensure attachments is initial populated  
     if (exchange.getIn().getAttachments().isEmpty()) {  
       return null;  
     }  
     // we want to provide a list of messages with 1 attachment per mail  
     List<Message> answer = new ArrayList<Message>();  
     for (Map.Entry<String, DataHandler> entry : exchange.getIn().getAttachments().entrySet()) {  
       final Message copy = exchange.getIn().copy();  
       copy.getAttachments().clear();  
       copy.getAttachments().put(entry.getKey(), entry.getValue());  
       answer.add(copy);  
     }  
     return answer;  
   }  
 }  

From the above code you can see the Expression splits the exchange into separate messages, each containing one attachment, stored in a List object which is then returned to the Camel runtime which can then be used to iterate through the messages.  For more information on how the splitter EIP works see the Camel Splitter EIP documentation.

Now we can test this Expression by using the following JUnit test case and verify that the attachments are indeed split into separate messages for processing:

 /**  
  * Licensed to the Apache Software Foundation (ASF) under one or more  
  * contributor license agreements. See the NOTICE file distributed with  
  * this work for additional information regarding copyright ownership.  
  * The ASF licenses this file to You under the Apache License, Version 2.0  
  * (the "License"); you may not use this file except in compliance with  
  * the License. You may obtain a copy of the License at  
  *  
  *   http://www.apache.org/licenses/LICENSE-2.0  
  *  
  * Unless required by applicable law or agreed to in writing, software  
  * distributed under the License is distributed on an "AS IS" BASIS,  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
  * See the License for the specific language governing permissions and  
  * limitations under the License.  
  */  
 package com.fusesource.example;  
 import com.fusesource.example.expression.AttachmentsExpression;  
 import com.fusesource.example.processor.MyMailProcessor;  
 import org.apache.camel.Endpoint;  
 import org.apache.camel.Exchange;  
 import org.apache.camel.Message;  
 import org.apache.camel.Producer;  
 import org.apache.camel.builder.RouteBuilder;  
 import org.apache.camel.component.mock.MockEndpoint;  
 import org.apache.camel.test.junit4.CamelTestSupport;  
 import org.junit.Test;  
 import org.jvnet.mock_javamail.Mailbox;  
 import javax.activation.DataHandler;  
 import javax.activation.FileDataSource;  
 import java.util.Map;  
 /**  
  * Unit test for Camel attachments and Mail attachments.  
  */  
 public class MailAttachmentTest extends CamelTestSupport {  
   
   private String subject = "Test Camel Mail Route";  
   
   @Test  
   public void testSendAndReceiveMailWithAttachments() throws Exception {  
     
     // clear mailbox  
     Mailbox.clearAll();  
       
     // create an exchange with a normal body and attachment to be produced as email  
     Endpoint endpoint = context.getEndpoint("smtp://james@mymailserver.com?password=secret");  
     
     // create the exchange with the mail message that is multipart with a file and a Hello World text/plain message.  
     Exchange exchange = endpoint.createExchange();  
     Message in = exchange.getIn();  
     in.setBody("Hello World");  
     in.addAttachment("message1.xml", new DataHandler(new FileDataSource("src/data/message1.xml")));  
     in.addAttachment("message2.xml", new DataHandler(new FileDataSource("src/data/message2.xml")));  
     
     // create a producer that can produce the exchange (= send the mail)  
     Producer producer = endpoint.createProducer();  
     
     // start the producer  
     producer.start();  
     
     // and let it go (processes the exchange by sending the email)  
     producer.process(exchange);  
     
     // need some time for the mail to arrive on the inbox (consumed and sent to the mock)  
     Thread.sleep(5000);  
     
     // verify destination1  
     MockEndpoint destination1 = getMockEndpoint("mock:destination1");  
     destination1.expectedMessageCount(1);  
     Exchange destination1Exchange = destination1.assertExchangeReceived(0);  
     destination1.assertIsSatisfied();  
     
     // plain text  
     assertEquals("Hello World", destination1Exchange.getIn().getBody(String.class));  
     
     // attachment  
     Map destination1Attachments = destination1Exchange.getIn().getAttachments();  
     assertEquals(1, destination1Attachments.size());  
     DataHandler d1Attachment = destination1Attachments.get("message1.xml");  
     assertNotNull("The message1.xml should be there", d1Attachment);  
     assertEquals("application/octet-stream; name=message1.xml", d1Attachment.getContentType());  
     assertEquals("Handler name should be the file name", "message1.xml", d1Attachment.getName());  
     
     // verify destination2  
     MockEndpoint destination2 = getMockEndpoint("mock:destination2");  
     destination2.expectedMessageCount(1);  
     Exchange destination2Exchange = destination2.assertExchangeReceived(0);  
     destination2.assertIsSatisfied();  
     
     // plain text  
     assertEquals("Hello World", destination2Exchange.getIn().getBody(String.class));  
     
     // attachment  
     Map destination2Attachments = destination2Exchange.getIn().getAttachments();  
     assertEquals(1, destination2Attachments.size());  
     DataHandler d2Attachment = destination2Attachments.get("message2.xml");  
     assertNotNull("The message2.xml should be there", d2Attachment);  
     assertEquals("application/octet-stream; name=message2.xml", d2Attachment.getContentType());  
     assertEquals("Handler name should be the file name", "message2.xml", d2Attachment.getName());  
     
     producer.stop();  
   }  
   
   protected RouteBuilder createRouteBuilder() throws Exception {  
     return new RouteBuilder() {  
       public void configure() throws Exception {  
         context().setStreamCaching(true);  
         from("pop3://james@mymailserver.com?password=secret&consumer.delay=1000")  
             .setHeader("subject", constant(subject))  
             .split(new AttachmentsExpression())  
             .process(new MyMailProcessor())  
             .choice()  
               .when(header("attachmentName").isEqualTo("message1.xml"))  
                 .to("mock:destination1")  
               .otherwise()  
                 .to("mock:destination2")  
             .end();  
       }  
     };  
   }  
 }  

In the route you can see the spilt is using the AttachmentsExpression which was shown above.  In addition, I am using a simple processor to set the header of the exchange which contains the name of the attachment.  Then, using the CBR (content base router) the exchange will be routed to an endpoint based on the attached file.  The test case uses two mock endpoints which are used to validate the body of the message, number of attachments, attachment name, and attachment type.

The following code was used in the MyMailProcessor to set the header:

 package com.fusesource.example.processor;   
  import org.apache.camel.Exchange;   
  import org.apache.camel.Processor;   
  import org.apache.log4j.Logger;   
  import javax.activation.DataHandler;   
  import java.util.Map;   
  /**   
  * Created by IntelliJ IDEA.   
  * User: jsherman   
  * Date: 4/9/12   
  * Time: 11:39 AM   
  * To change this template use File | Settings | File Templates.   
  */   
  public class MyMailProcessor implements Processor {   
   
   private static final Logger LOG = Logger   
     .getLogger(MyMailProcessor.class.getName());  
   
   public void process(Exchange exchange) throws Exception {   
    
    LOG.debug("MyMailProcessor...");   
    String body = exchange.getIn().getBody(String.class);   
    
    Map<String, DataHandler> attachments = exchange.getIn().getAttachments();   
    if (attachments.size() > 0) {   
     for (String name : attachments.keySet()) {   
      exchange.getOut().setHeader("attachmentName", name);   
     }   
    }   
    
    // read the attachments from the in exchange putting them back on the out   
    exchange.getOut().setAttachments(attachments);   
    
    // resetting the body on out exchange   
    exchange.getOut().setBody(body);   
    LOG.debug("MyMailProcessor complete");   
   }   
  }  

Setting a up new maven project to test this out for yourself is very easy.  Simply create a new maven project using one of the available Camel maven archetypes, the camel-archetype-java should work just fine for this.  Once you have the project created you just need to copy the above classes into the project.

In addition to setting up the code you will also need to ensure the following dependencies are included in the project's pom.xml:

   <properties>  
    <camel-version>2.9.0</camel-version>  
   </properties>   
 ...   
   <dependency>  
    <groupId>org.apache.camel</groupId>  
    <artifactId>camel-mail</artifactId>  
    <version>${camel-version}</version>  
   </dependency>  
   <dependency>  
    <groupId>org.apache.camel</groupId>  
    <artifactId>camel-test</artifactId>  
    <scope>test</scope>  
    <version>${camel-version}</version>  
   </dependency>  
   <dependency>  
    <groupId>org.jvnet.mock-javamail</groupId>  
    <artifactId>mock-javamail</artifactId>  
    <version>1.7</version>  
    <exclusions>  
      <exclusion>  
        <groupId>javax.mail</groupId>  
        <artifactId>mail</artifactId>  
      </exclusion>  
    </exclusions>  
    <scope>test</scope>  
   </dependency>  

Note for the above test case I had my own class that implemented the SplitAttachmentExpression, as I was doing this before the class was added into the code base.  If you are using the latest 2.10 the SplitAttachmentExpression import should be changed to org.apache.camel.component.mail.SplitAttachmentExpression. If you are using an earlier version, simply create the class as I did using the code from above.