Riding The Camel With Groovy

Please see my project grootils on github where I collect such tools.

Apache Camel is a routing and mediation engine. I personally love its huge number of components available and the easy and productive way to do day to day work with it.

I’d like to show some basic things you can do with Camel using Groovy. Other readings can be found on the Articles on Apache Camel page.

To include Camel into your project, use Grape:

@Grab(group = "org.apache.camel", module = "camel-core",   version = "2.4.0")
@Grab(group = "org.apache.camel", module = "camel-groovy", version = "2.4.0")
@Grab(group = "org.apache.camel", module = "camel-xmpp",   version = "2.4.0")

Remember: if you use Grape (with this libraries) for the first time, it may take some time as all needed libraries are downloaded when you start your application.

I had a problem loading JiveSoftware’s Smack library automatically as a dependency and had to edit Ivy’s XML file:

$ cd /Users/rbe/.groovy/grapes
$ vi org.apache.camel/camel-xmpp/ivy-2.4.0.xml

Find the <dependencies> section and edit the lines for Smack to look like this:

<dependency org="jivesoftware" name="smack" rev="3.1.0" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="jivesoftware" name="smackx" rev="3.1.0" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>

Or just download release 2.4.0 in the version actual when I write this post. Page for release 2.4.0 can be found here. The lib folder contains lots of libraries. For this post we need camel-core-2.4.0.jar, camel-groovy-2.4.0.jar and camel-xmpp-2.4.0.jar as well as the Smack library from JiveSoftware. For more information on Camel’s XMPP component go here.

We configure two routes:

  1. Watch a directory for files, filter them by extension, push them to a POGO aka bean and publish a message on Google Talk using XMPP
  2. Watch another directory for files and push them to a bean

In this example the routes are independent of another, image your application receives a file, processes it some way and puts that file into the out folder after that.

To build Camel routes with Groovy, one way is to extend the GroovyRouteBuilder and override method configure(). Within that method you use a DSL to configure your routes. See Groovy Renderer User Guide.

Routes

Let’s start by defining the two routes:

class FileRouter extends org.apache.camel.language.groovy.GroovyRouteBuilder {

    /**
     * Define routes for Camel.
     */
    protected void configure() {
        //
        // Route 1
        //
        // Watch directory for incoming file
        from("file:///tmp/in")
        // ...filter by its name
        .filter {
            // Ignore hidden dot-files and watch for PDFs
            !it.in.headers.CamelFileName.startsWith(".") && it.in.headers.CamelFileName.endsWith(".pdf")
        }
        // ...and push file to bean; the method must take a File argument
        .to("bean:camelProcessor?method=receive") // Should return text for XMPP
        // ...send message to Google Talk
        .to("xmpp://talk.google.com:5222/recipient@googlemail.com?serviceName=gmail.com&user=sender@googlemail.com&password=senderpassword")
        .to("bean:camelProcessor?method=afterXMPP")
        //
        // Route 2
        //
        // Process outgoing file
        from("file:///tmp/out")
        // ...and push file to bean; the method must take a File argument
        .to("bean:camelProcessor?method=send")
    }

}

The first route looks for new files in /tmp/in and filters them by their name. Files getting through this filter will be taken as an argument to a method call CamelProcessor#receive(File). This will be explained below.

Camel Headers

If you ask yourself what headers Camel provides while processing a file, just do a

it.in.headers.each { println it }

within the filter closure and watch out. Documentation is available at File Component, scroll down to section called Message Headers.

URIs

For URI options to control how Camel treats the files see documentation of File Component, section URI options. Just add them like in HTTP URLs by adding a question mark and variable=value pairs, like ?delete=true. An example to consume files only once (and to not move files in a special .camel directory when they are processed) looks like:

from("file:///tmp/in?noop=true&idempotent=true")

I used this in a recent project when files should stay where they were put initially. Detailed information about idempotency can be found in the same documentation, section Avoiding reading the same file more than once (idempotent consumer). Read it carefully and decide whether you need it or not.

Another important reading is about Locking, see note in section URI format:

Avoid reading files currently being written by another application

Beware the JDK File IO API is a bit limited in detecting whether another application is currently writing/copying a file. And the implementation can be different depending on OS platform as well. This could lead to that Camel thinks the file is not locked by another process and start consuming it. Therefore you have to do you own investigation what suites your environment. To help with this Camel provides different readLock options that you can use. See also the section Consuming files from folders where others drop files directly

Beans

In the routes above I used a bean: URI which just routes the files to a method. The bean must be registered with Camel (see below) and the method name is given as a URI parameter: bean:camelProessor?method=receive. These methods are called after Camel has consumed a file in that route.

You should add some processing logic there:

/**
 * Process files with Camel.
 */
@Singleton
class CamelProcessor {

    def receive(File file) {
        // Do something with the file
        println "I cannot believe it, I received a file: ${file}"
        // Return file name
        "We received ${file}"
    }

    def afterXMPP(Object obj) {
        println "afterXMPP: obj=${obj?.dump()}"
    }

    def send(File file) {
        // Do something
        println "Don't let me keep you: ${file}"
        // Return file
        file
    }

}

Take care: the methods should return an object expected by the next step in the route. Imagine using more than one .to("bean:"). The return value of first to() is passed to the second to() call. But as shown here: XMPP needs a string as its argument… so take care.

Setup Camel

To start up Camel you need to create a Camel context and in this example to register the bean used in the routes. I also registered a shutdown hook with the JVM to stop Camel when the JVM stops.

class Main {

    /**
     * Ride the Camel...
     */
    def static setupCamel() {
        // Create Camel context
        camelCtx = new org.apache.camel.impl.DefaultCamelContext()
        // Setup registry and register a bean
        camelCtx.registry = new org.apache.camel.impl.SimpleRegistry()
        camelCtx.registry.registry.put("camelProcessor", CamelProcessor.instance)
        // Add routes and start Camel; this call doesn't block
        camelCtx.addRoutes(new FileRouter())
        camelCtx.start()
        // Stop Camel when the JVM is shut down
        Runtime.runtime.addShutdownHook({ ->
            camelCtx.stop()
        })
    }

    public static void main(String[] args) {
        // Camel
        setupCamel()
        // Now start your application
        // ...
    }

}

Add a thread

You can even start a thread to watch a received file. Just add this method to the class CamelProcessor. This example watches a file and prints an information when its last modification time changes. The thread is terminated when the does not exist any longer:

def watchFile(File file) {
    // Watch the file in a thread
    def r = { ->
        println "watchFileTime: start watching ${file}"
        // Remember last modified
        def lm = file.lastModified()
        // As long as the file exists...
        while (file.exists()) {
            // Check file modification time and compare to saved value
            if (file.lastModified() != lm) {
                // Remember last modified
                lm = file.lastModified()
                // Do something...
                println "${file} was changed"
            }
            // Hold on a second
            try { Thread.sleep(1 * 1000) } catch (e) {}
        }
        println "watchFileTime: stop watching ${file}"
    } as java.lang.Runnable
    // Create and start thread
    new java.lang.Thread(r).start()
    // Return file
    file
}

Do not forget to integrate this into a Camel route by calling the method watchFile when a file is received:

.to("bean:camelProcessor?method=watchFile")

HTH.

<

p>

This entry was posted in Software Development and tagged , , . Bookmark the permalink.