Tuesday, January 6, 2015

JBoss Fuse - Running Cron Jobs with Camel, uploading files using SFTP. Part one

In the old days when I was a developer in an finance company, we used to have hundreds of night time daily jobs, weekly and even yearly jobs. These job often required a lot of integration between systems, and can be very hard to manage, and what happen if the job requires intensive CPU usage and needed scale out?
With the Quartz2 component in camel, it provides a scheduled delivery of messages or executing business logics, even interact with other systems. Here I am going to show you show use the Quartz2 component that does load balance and upload a file to a FTP Server. 

If you ever played with Camel, you'll know there is a component call Timer, it  is used to generate message exchanges when a timer fires. So why use Quartz instead of timer? 
This is explained in Camel's document, 

Timer is a simple, non persistence timer using the JDK's in built timer mechanism.
Quartz uses the Quartz library which uses a database to store timer events and supports distributed timers and cron notation.

That means timer is more like a fire and forget mechanism, whereas Quartz provides a more complete schedule solution, it allows you configure clustering to do load balance and handle fail overs.

In this demo project, it tells you how setup a cron job using Quartz2 component with 2 node running that does the load balancing, and upload a file to a FTP Server through SFTP.  Also uses FTP component to poll from server and printout the content of the file. I will also add another timer component just to compare. :)


You can find all the camel component documentation here.

First create a Fuse Blueprint Project.
There are 2 dependencies you need to add to your pom.xml 

 
     org.apache.camel
     camel-quartz2
     2.12.0.redhat-610379
 
 
     org.apache.camel
     camel-ftp
     2.12.0.redhat-610379
   
 
  postgresql
  postgresql
  9.0-801.jdbc4
  

And now we are going to the core of what makes this demo interesting for you, setting up Quartz, but if you are familiar with the Quartz already, it would be exactly the same. There are 2 main kinds of Job store in Quartz, Ram and JDBC, Ram is more similar to Timer and cannot provide cluster config, therefore, we need to setup the Database Job Store, so all node have a centralized place to get job information. And also enable clustering by etting the "org.quartz.jobStore.isClustered" property to "true". I am using a PostgreSQL Database for this, but you can modify it to what ever database you want. 

Download the Quartz Jar and find the database setup script under docs/dbTables, find the sql file that you are using, I will use the tables_postgres.sql


create a new database in PostgreSQL and run the script.


In your project under src/main/resources/ create a folder org/quartz and file name "quartz.properties". 

# Main Quartz configuration
org.quartz.scheduler.skipUpdateCheck = true
org.quartz.scheduler.instanceName = DatabaseClusteredScheduler
org.quartz.scheduler.instanceId = AUTO
org.quartz.scheduler.jobFactory.class = org.quartz.simpl.SimpleJobFactory
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.dataSource = quartzDataSource
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 10
org.quartz.jobStore.clusterCheckinInterval = 20000


# JobStore: JDBC jobStoreTX
org.quartz.dataSource.quartzDataSource.driver = org.postgresql.Driver
org.quartz.dataSource.quartzDataSource.URL = jdbc:postgresql://localhost:5432/quartz2
org.quartz.dataSource.quartzDataSource.user = quartz2
org.quartz.dataSource.quartzDataSource.password = quartz2123
org.quartz.dataSource.quartzDataSource.maxConnections = 10

Noted in the properties files, it uses isClustered = true, because we want to make sure our 2 nodes can correctly load balance. Same as in Quartz, there are CronTrigger and SimpleTrigger, since SimpleTrigger is similar with Timer, I will use CronTrigger. 

The URI format are as follows, 

quartz2://timerName?options
quartz2://groupName/timerName?options
quartz2://groupName/timerName?cron=expression
quartz2://timerName?cron=expression

The cron expression documentation can be found by here.

quartz2://myGroup/myTimer?cron=0+0/1+09-23+?+*+MON-SAT

my expression says,  Run the job in every minutes from Monday to Saturday 9am till 11pm. 
After all these are done,


        
        
            MyJob-${date:now:yyyyMMdd HH:mm:ss}.txt
        
        
            ${date:now:yyyyMMdd HH:mm:ss} Hello THis is the content of text 
        
        
    

You can also find the FTP component here, but the URI is very simple too, 

ftp://[username@]hostname[:port]/directoryname[?options]
sftp://[username@]hostname[:port]/directoryname[?options]
ftps://[username@]hostname[:port]/directoryname[?options]

In normal cases, fill in the informations you have. And the second route is very similar, it listens to FTP and print out the content. 

        
        

I've also added a timer route, that basically upload a file every 30 seconds. 



        
            MyTimer-${date:now:yyyyMMdd HH:mm:ss}.txt
        
        
            ${date:now:yyyyMMdd HH:mm:ss} Hello This is the content from Timer 
        
    

This video tells you how write a simple program,



I will deploy this project onto fabric in part 2. And show the load balancing of the cluster job.



Monday, December 29, 2014

JBoss Fuse - Implementing WebService with Camel using CXF SOAP and Restful with CXFRS. Part Two

Continue from last post, this time I am going to add a restful web service endpoint to our insurance application, and show you use the CXF producer. :)


Similar to CXF web service, we need to create a resource file, but this time, instead of an interface, we need to create a java class. Within the resource class file, we have specify 2 things, the context path and the url path of each functions for the restful service. So in the java file below, you will see that I have declare "status" context path for our status service, "/custId/{id}" for getting the status and "/restcancel/{polno}" for canceling claim with restful API.


package org.blogdemo.claimdemo;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/status")
public class StatusService {
	
	@GET
	@Path("/custId/{id}")
	@Produces(MediaType.APPLICATION_JSON)
	public ClaimStatus status(@PathParam("id") String custId){
		return null;
	}

	@GET
	@Path("/restcancel/{claimNo}")
	@Produces(MediaType.APPLICATION_JSON)
	public ClaimStatus restCancel(@PathParam("claimNo") String claimNo){
		return null;
	}
}


After creating the resource file, we need to register the restful endpoint in the camel context, it's very similar to the the SOAP one, by setting the address and service class which points to the resource file you create. That's it.

<cxf:rsServer id="statusEndpoint" address="http://localhost:9191/cxf/status" serviceClass="org.blogdemo.insurancedemo.StatusService" />

That's all we have to do to created the restful endpoint. Then adding a method in the processor for status check.

What about the rest cancel function?  Remember we already have a web service that does the cancel operation? Now what happens if we want to reuse it? it's a piece of cake, there are several ways of using the CXF producer,  we can specify to send data into different format, we could do, PAYLOAD, MESSAGE and POJO. You must know how much I love using POJO, so yes, I am going to use POJO for sending the message. The difference between these are simply just different format of data we are sending via this endpoint, with PAYLOAD, you will need to send a SOAP message.

For CXF producer with POJO data format, we need to prepare
1. header operation name
2. body content

The header operation name is the service you want to call and with POJO data format, you will need to put everything into a List. Within the processor, you will see me placing the content into a ArrayList.
	   public ClaimStatus status(String id) throws Exception {
	       // get the id of the input
	   	//ClaimInput input = exchange.getIn().getBody(ClaimInput.class);

	   	System.out.println(id);
	   	System.out.println("ClaimInput :["+ClaimInput.class.getPackage().getName()+"]");
	   	
	       // set reply including the id
	   	ClaimStatus status = new ClaimStatus();
	   	status.setCustomerID(id);
	   	status.setPolno("A123456789");
	   	status.setClaimNo("34567789");
	   	status.setStatus("OK");
	   
	   	return status;
	   }
	   
	   public List prepareList(String polno){
		   final List params = new ArrayList();
		   params.add(polno);
		   
		   return params;
	   }

This is what my camel route looks like. This time, I use content base routing to redirect to my desire processing bean and method.

    
        
        
        
            
                ${header.operationName} == "status"
                
            
            
                ${header.operationName} == "restCancel"
                
                    cancel
                
                
                
            
        
        
            
        
    

Lastly, I will deploy the application onto Fuse Fabric. Don't forget to setup your feature list.
You will see the application route running here.


And the registry can also be found here.

Video demo part Two.

Code Repo : https://github.com/weimeilin79/claim-cxf-cxfrs

Hope you enjoy me CXF insurance demo.

Thursday, December 25, 2014

JBoss Fuse - Implementing WebService with Camel using CXF SOAP and Restful with CXFRS. Part One

Some friend of mine in Japan asking if I could do a simple demo on Camel CXF,  So here it is :).
This demo is about an insurance simple online claim application and status update. I will show you how to use Camel CXF to quickly setup an WebService, also setup Restful WebServices in few very easy steps.

In this demo we are going to setup a web service that takes in customer's insurance claim application, also provide another cancel function in case they have changed their mind.

One of the reason I love so much about Camel is that I don't need to deal with different context of the data, like XML, JSON... I mean they are ok, but I like POJO more, as a Java developer they are more meaningful and much easier to play with. So now we are going define some POJOs that we are going to exchange in this demo.

These two POJO contains the application information required (ClaimInput) and the result of the application submission (ClaimOutput).
package org.blogdemo.claimdemo;

import java.util.Date;

public class ClaimInput {
	
	String customerName = "";
	String customerId="";
	Date claimDate= null;
	String contactPhone = "";
	String email = "";
	String polno = "";
	int applyItem = 0;
	int claimType = 0;
	
	public String getCustomerName() {
		return customerName;
	}
	public void setCustomerName(String customerName) {
		this.customerName = customerName;
	}
	public String getCustomerId() {
		return customerId;
	}
	public void setCustomerId(String customerId) {
		this.customerId = customerId;
	}
	public Date getClaimDate() {
		return claimDate;
	}
	public void setClaimDate(Date claimDate) {
		this.claimDate = claimDate;
	}
	public String getContactPhone() {
		return contactPhone;
	}
	public void setContactPhone(String contactPhone) {
		this.contactPhone = contactPhone;
	}
	public String getEmail() {
		return email;
	}
	public void setEmail(String email) {
		this.email = email;
	}
	public String getPolno() {
		return polno;
	}
	public void setPolno(String polno) {
		this.polno = polno;
	}
	public int getApplyItem() {
		return applyItem;
	}
	public void setApplyItem(int applyItem) {
		this.applyItem = applyItem;
	}
	public int getClaimType() {
		return claimType;
	}
	public void setClaimType(int claimType) {
		this.claimType = claimType;
	}
	
	

}


package org.blogdemo.claimdemo;

public class ClaimOutput {

	String customerName = "";
	String claimNo = "";
	String status = "";
	
	public String getCustomerName() {
		return customerName;
	}
	public void setCustomerName(String customerName) {
		this.customerName = customerName;
	}
	public String getClaimNo() {
		return claimNo;
	}
	public void setClaimNo(String claimNo) {
		this.claimNo = claimNo;
	}
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	
	
	
}


And a third POJO ClaimStatus is for showing the status of application cancel submission.
package org.blogdemo.claimdemo;

public class ClaimStatus {
	String status = "";
	String customerID = "";
	String polno ="";
	String claimNo="";
	public String getStatus() {
		return status;
	}
	public void setStatus(String status) {
		this.status = status;
	}
	public String getCustomerID() {
		return customerID;
	}
	public void setCustomerID(String customerID) {
		this.customerID = customerID;
	}
	public String getPolno() {
		return polno;
	}
	public void setPolno(String polno) {
		this.polno = polno;
	}
	public String getClaimNo() {
		return claimNo;
	}
	public void setClaimNo(String claimNo) {
		this.claimNo = claimNo;
	}
	
	
}


Now we have all the hard part done, (Trust me, it's always defining the model that is the hardest.) We can now start on creating the web service in our camel route.

To define a web service there are 2 main steps.
1. Define an interface of the web service
2. Setup the endpoint in Camel context,

Once you get those two ready, you are pretty much done, so we will create a "ClaimService" interface, that specify two operations in our insurance claim application, apply and cancel. Have a look at the code, notice the apply method takes in ClaimInput as parameter and ClaimOutput as the return value. And for cancel method, we take in the claim number (String) and return ClaimStatus as the output.


package org.blogdemo.claimdemo;

public interface ClaimService {
	
	public ClaimOutput apply(ClaimInput input);
	public ClaimStatus cancel(String claimNo);

}

Then we go back to the camel context xml, and register our web service interface using camel-cxf component. So all you have to do it to specify the address of the web service and point the endpoint to the service class, which is the interface we just created. 

<cxf:cxfEndpoint id="claimEndpoint" address="http://localhost:9191/cxf/claim" serviceClass="org.blogdemo.insurancedemo.ClaimService" />

Lastly, the business processing part, here I am going to just place some constant value inside, but in normal world, this is where you place the actual processing logic. 

package org.blogdemo.claimdemo;

public class ClaimProcessor {
	public ClaimOutput process(ClaimInput input) throws Exception {
        // get the id of the input
    	//ClaimInput input = exchange.getIn().getBody(ClaimInput.class);

    	System.out.println(input);
    	
    	// set reply including the id
        ClaimOutput output = new ClaimOutput();
        output.setClaimNo("A00099484");
        output.setCustomerName(input.getCustomerName());
        output.setStatus("DONE");
        //exchange.getOut().setBody(output);
        return output;
    }
	
	 public ClaimStatus cancel(String claimNo) throws Exception {
	      
			ClaimStatus status = new ClaimStatus();
		   	status.setClaimNo(claimNo);
		   	status.setStatus("OK");
		   	
		   	return status;
	   }
}


Then we are going to put it together, here is what the camel route looked like. Notice we uses the operation name header to determine the process we want to pass on to.





  
  
  
  
    
        
        
        
            direct:${header.operationName}
        
    
    
        
        
    
    
        
        
    
 



And don't forget to update the pom.xml for all dependencies needed for this demo.
  
    
  		org.ow2.asm
		asm-all
		4.1
	
    
      org.apache.camel
      camel-core
      2.12.0.redhat-610379
    
    
      org.apache.camel
      camel-blueprint
      2.12.0.redhat-610379
    
    
	  org.apache.camel
	  camel-jaxb
	  2.12.0.redhat-610379
    
	
	  org.apache.camel
	  camel-jackson
	  2.12.0.redhat-610379
	    
	
    
      org.apache.camel
      camel-http4
      2.12.0.redhat-610379
	  
	
	
	 
	    org.apache.camel
	    camel-cxf
	    2.12.0.redhat-610379
	
		
	
	    org.apache.cxf
	    cxf-rt-frontend-jaxrs
	    2.7.0.redhat-610379
	
	
	
	    org.apache.cxf
	    cxf-rt-transports-http-jetty
	    2.7.0.redhat-610379
	
	
		


    
    
      org.slf4j
      slf4j-api
      1.7.5
    
    
      org.slf4j
      slf4j-log4j12
      1.7.5
    
    
      org.slf4j
      jcl-over-slf4j
      1.7.5
    
    
      log4j
      log4j
      1.2.17
    

    
    
      org.apache.camel
      camel-test-blueprint
      2.12.0.redhat-610379
      test
    
  

Now start the route by running camel:run in maven. The Registry URL will be http://localhost:9191/cxf/claim for now.


We can start the application, by using the SOAP UI tool, create a new project with the WSDL address as http://localhost:9191/cxf/claim?wsdl, then we can now play with the web service we created!

Here is the video:



There is another form of web service,  Restful web service, next part of the demo, I will show you how to build it and how to create CXF producer in Camel.

Thanks! 

Saturday, December 20, 2014

JBoss Fuse - File Connector with Split Pattern and JDBC connector with Error Handling - Part Two

This is the second part of the demo, "JDBC connector with Error Handling ", please take a look at the previous blog post. You can also find the finished code base here.

In the XML file, you'll find there is a VIP status, this indicates the process fee we need to charge per transaction, for DIAMOND class VIP, it's $1, $2 for GOLD and $3 for other accounts.

Using JDBC Connector, we first must create datasource setting, either within Camel context xml or in Fabric8. To make testing easy, we will set the setting in the datasource.

XML
 	
    	
    	
    	
  		
  	 
  
  	
		
  	
  
	
		
		
	
Here you will see I use the H2 Database in memory, I am using it because it's easier to setup, without the need of installing a database. If you want, you can always change it to your desire database, all you have to do is just replace the driverClassName and url, along with the ID and password to access the database. And then it's always a good idea to include an transaction manager so we can set the transaction policy. 

Before calling the jdbc component, jdbc:dataSourcePS, remember to set SQL to execute into the body. 

<setBody>
  <simple>SELECT balance from customerdemo where customerID = '${header.CustId}';</simple>
</setBody>
<to uri="jdbc:dataSourcePS?resetAutoCommit=true"/>

And we might want to capture and do something when there were error. Same as in Java programming we can specify exception policies to use and can then intercept and handle the exceptions. We can specify it by using the on Exception. After capturing the exception, we can either process it or redeliver the route again. Here is an example we use in the demo. 

    
        java.lang.Exception
        
        
            true
        
        
    

From last demo, we have process and split the XML from different bank branches into individual separate transaction. And they are group into 2 different groups, cash and transfer transaction.There are no fees when the transaction is in cash, but for Transfer, you'll find there are VIP status, this indicates the process fee we need to charge, for DIAMOND class VIP, it's $1, $2 for GOLD and $3 for other accounts.

To update the balance in database first we select to make sure we have enough balance to deduct or we will throw a not enough balance exception, which we will handle with a logging it.





Carry on from our last demo, we are going to extend the two camel route.
i) processCash





    
        
        
            SELECT balance from customerdemo where customerID = '${header.CustId}';
        
        
        
        
            UPDATE customerdemo SET balance = ${body} where customerID = '${header.CustId}';
        
        
    

Within the route you will see a java bean, it is the actual business logic to add and deduct the money from each account.


package org.blogdemo.bankdemo;

import java.util.ArrayList;
import java.util.HashMap;

import org.apache.camel.Exchange;

public class BankBean {
	int countError=0;
	
	public static final int DIAMOND_FEE = 1;
	public static final int GOLD_FEE = 2;
	public static final int NORMAL_FEE = 3;
	
	public void transfer(Exchange oldExchange, Exchange newExchange)throws Exception{
		Integer amtDeduct = ((Integer)newExchange.getIn().getHeader("amt",Integer.class));
		newExchange.getIn().setHeader("amt", amtDeduct * -1);
	}
	
	public void doBalance(Exchange oldExchange, Exchange newExchange)throws Exception{
		
		ArrayList> balanceList =(ArrayList>) newExchange.getIn().getBody(ArrayList.class);	
		HashMap result = balanceList.get(0);
		Integer newBalance = ((Integer)result.get("BALANCE")) +((Integer)newExchange.getIn().getHeader("amt",Integer.class));
		
		if("Diamond".equalsIgnoreCase(newExchange.getIn().getHeader("amt",String.class))){
			newBalance -= DIAMOND_FEE;
		}else if("Gold".equalsIgnoreCase(newExchange.getIn().getHeader("amt",String.class))){
			newBalance -= GOLD_FEE;
		}else{
			newBalance -= NORMAL_FEE;
		}
		
		if(newBalance < 0){
			throw new Exception("NOT ENOUGH BALANCE");
		}
		
		newExchange.getIn().setBody(newBalance);
		System.out.println( "--->newBalance :["+newBalance+"]" );
	} 
	
	public void doBalanceWithoutFee(Exchange oldExchange, Exchange newExchange)throws Exception{
		
		ArrayList> balanceList =(ArrayList>) newExchange.getIn().getBody(ArrayList.class);	
		HashMap result = balanceList.get(0);
		Integer newBalance = ((Integer)result.get("BALANCE")) +((Integer)newExchange.getIn().getHeader("amt",Integer.class));
		
		
		if(newBalance < 0){
			throw new Exception("NOT ENOUGH BALANCE");
		}
		
		newExchange.getIn().setBody(newBalance);
		System.out.println( "--->newBalance :["+newBalance+"] countError:["+countError+"]" );
		
	} 	
	

}


ii) doTransfer







This route is very similar with last one, but since transfer occurs between 2 accounts, so there are more select and update works to be done. Also within the route you will see a java bean, it is the actual business logic to calculate the how much to add or deduct and determine how much rate bank should charge.


    
        
        
            SELECT balance from customerdemo where customerID = '${header.CustId}';
        
        
        
        
        
            UPDATE customerdemo SET balance = ${body} where customerID = '${header.CustId}';
        
        
            SELECT balance from customerdemo where customerID = '${header.receiverId}';
        
        
        
        
        
            ${header.transfersql}  UPDATE customerdemo SET balance = ${body} where customerID = '${header.receiverId}';
        
        
    

Lastly, because we are using tables in memory, so there are some preparation needs to be done beforehand. Here is a very simple setup of creating tables and accounts in database. Also a timer that display all the account within database in log.


    
        
        
            CREATE TABLE customerdemo (customerID character varying(10) NOT NULL,vipStatus character varying(10) NOT NULL ,  balance integer NOT NULL);
        
        
    
    
        
        
            INSERT INTO customerdemo (customerID,vipStatus,balance) VALUES ('A01','Diamond',1000);
            INSERT INTO customerdemo (customerID,vipStatus,balance) VALUES ('A02','Gold',500); 
        
        
    
    
        
        
            select * from customerdemo
        
        
        
    


Here is the follow up video, please take a look at the part 1 if you have not see before. 


We can also deploy this example onto Fuse Fabric , please go to my code repo, and find the installation script, and run init.sh. 

To run the demo, in browser enter http://localhost:8181 and login with ID/PWD of admin/adminUnder Runtime, you will see list of containers, and click on the small icon on the righthand-side of the testcon container Fabric listInside the Container, under Camel tab, you will see the list of routes we have. Container Route ListClick on Endpoint on the lefthand-side, choose the file endpoint, and send the xml. Containter Endpoint SendYou will see the transfer result in the log.Container Log

Enjoy! 


Monday, December 1, 2014

JBoss Fuse - File Connector with Split Pattern and JDBC connector with Error Handling - Part One

This demo is slightly longer then the ones I done before, So I will break it into 2 parts, and try to keep each section as simple as possible, and then combine 2 simple section into a bigger demo. During the process I hope you can find it's very simple to integrate, process data and information with JBoss Fuse.

Part One
=========

  • File connector
  • Split Pattern


We are going to take in XML file from different branch in a directory, each contains cash deposit, cash withdraw and transfer data, depending on the type of transaction, we spilt this XML file into different sections, then send them to different services for further process.

The format of a XML looks like this. 

 
  A02
  Gold
  
   -20
  
 
 
  A02
  Gold
  
   A01
   20
  
 




First create a route that has a file consumer endpoint that read xml files from directory, there are many different options like, readLock - locks the file when reading it , recursive -look for files in all the sub-directories as well , extra.. for more options please read the documentation. After retrieving the xml file, we are going to divide the file into different part using the Spilt pattern in Camel. The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually.


For split, we will need to specify how the message needs to be separated.  In our case, we are going to use the XPATH to indicate how the xml file is divided. (XPath is a language for addressing parts of an XML document). From our XML. file there are multiple transactions under the root element Bank, therefore the XPATH will be "//Bank/Transaction" then we send each individual xml parts to another route to process. 

    
        
        
            //Bank/Transaction
            
        
    

For each individual transaction, we get the data using XPath and set them in header, and for different types of transaction, Cash and Transfer, we send it to different route for further process.
    
        
        
            /Transaction/CustId/text()
        
        
            /Transaction/VipStatus/text()
        
        
            /Transaction/Detail/amount/text()
        
        
            ${body}
        
        
            
                /Transaction[@type='Cash']
                
            
            
                /Transaction[@type='Transfer']
                
                    /Transaction/Detail/CustId/text()
                
                
            
        
    

Lastly, for the 2 processing routes, is to export each transaction into a single xml file. So we need the file component again. This time file component is a Producer, for producer, we have fileExist- that overrides the file when exist. etc. 



    
        
        
    
    
        
        
    

If we test and place the branch-xxx.xml file into the inputdir, it will process the xml and split the file into different small pieces. 

Here is the video that shows you how to create the project. 
Demo code will be out with part 2. 



Tuesday, November 25, 2014

JBoss Fuse - Connecting to Twitter and the Aggregator pattern

If you think connecting to Facebook is easy with Camel, try connecting to Twitter! It's every easier!
Before connecting to Twitter, make sure you have register a twitter developer account, and go to https://apps.twitter.com/. Create a new Twitter App.

There are a few mandatory parameter for authentication if you want to play with Twitter, they are :
  • consumerKey - The consumer key
  • consumerSecret - The consumer secret 
  • accessToken - The access token
  • accessTokenSecret - The access token secret

Also Twitter has Consumer  as it handle mostly polling functionality, such as search from content, streaming tweets from twitter etc.

  • timeline/home - read or polls tweet from user home
  • search - search specific keywords,
  • streaming/filter - get streaming of data from twitter.

Whereas Producer handles tweeting and calling of twitter API, they might also include searching and timeline polling too.

  • directmessage - gets the message 
  • search - search for keywords
  • timeline/user - polling from users
For more detail please visit the product documentation

Note, currently Twitter API is in version V1.1, using the JBoss Fuse 6.1 should work without a problem. since it's base on Twitter4j V3.x

To play with Camel-Twitter connector.
First, we are going to create a twitter app(make sure you have already register an developer account for Twitter), and then go to https://apps.twitter.com/ 


Provide details of the app, then you will come to the application console, click on (modify app permissions) so we can read and publish into our account.

Click on Generate My Access Token and Token secret to get the authentication keys under Your Access Token.


Save both customer and access token and token secret somewhere handy, because we are going to need them later. Make sure you have installed JBDS and the integration plugin.
Go to JBDS, create a new blueprint Fuse project, enter Group ID and artifact ID.


Open pom.xml and add camel-facebook dependency.

Drag the endpoints from palette to canvas,
Endpoint with uri :

twitter://timeline/home?type=polling&amp;delay=10&amp;consumerKey={{consumerKey}}&amp;consumerSecret={{consumerSecret}}&amp;accessToken={{accessToken}}&amp;accessTokenSecret={{accessTokenSecret}}

and log: ${body.source}

Add the properties file containing twitter authentication settings, the 4 must have parameter mentioned above, and fill them in by mapping the tokens you get from creating the Twitter App.
  • consumerKey - The consumer key
  • consumerSecret - The consumer secret 
  • accessToken - The access token
  • accessTokenSecret - The access token secret
Map the properties file into Camel Context by using "propertyPlaceholder"

<propertyPlaceholder location="classpath:twitter.properties" id="twitter"/>

So you can start test the first part of the Twitter Camel Example. Related videos can be found here:


Now, going back to our demo. In part 2, we are going to poll tweets every 10 secs from your personal timeline, and analysis the source device of the tweets, and make a summary of how many time each device was use and tweets the result back to Twitter!

From part one, we have already polling tweets from twitter, now to further process the content, add a content switch to the route, that will add 4 different header content to the header bane "devicetype" and call another route for further process.

    
        
        
        
            
                ${body.source} == "Web Client"
                
                    Web
                
            
            
                ${body.source} == "iPhone"
                
                    iPhone
                
            
            
                ${body.source} == "Android"
                
                    Android
                
            
            
                
                    Others
                
            
        
        
    


Notice we are using "seda" to pass the route, seda. The seda: component provides asynchronous SEDA behavior, so that messages are exchanged on a BlockingQueue and consumers are invoked in a separate thread from the producer. We do that because the twitter connector creates one route exchange per returned object, instead of returning a list of tweets, and in our example, the order of process does not matter.

With all the different tweets coming in separately, we now want to summarize the total device, that's when the EIP Aggregator becomes very handy. The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message. In this component, we can implement our own aggregation strategy, and choose different ways when to stop the aggregation, like aggregate every 10 seconds, aggregate every 5 messages, or after certain sizes or even end in some predicted condition.  (For more details, please read the product document). In our demo, we are aggregating every 10 seconds.

So create a java bean that holds the device data (Web, iPhone, Android and Others)
package org.blogdemo.twitterdemo;

public class SourceCounter {
 
 
 private int web = 0;
 private int android = 0;
 private int iphone = 0;
 private int others = 0;

 public void addType(String type){
  if("Web".equals(type))
   web ++;
  if("Android".equals(type))
   android++;
  if("iPhone".equals(type))
   iphone++;
  if("Others".equals(type))
   others++;
 }
 
 public void addWeb() {
  web++;
 }
 
 public void addAndroid() {
  android++;
 }
 
 public void addIphone() {
  iphone++;
 }
 
 public void addOthers() {
  others++;
 }
 
 
 
 
 
 public int getWeb() {
  return web;
 }
 
 public void setWeb(int web) {
  this.web = web;
 }
 public int getAndroid() {
  return android;
 }

 
 public void setAndroid(int android) {
  this.android = android;
 }
 public int getIphone() {
  return iphone;
 }
 public void setIphone(int iphone) {
  this.iphone = iphone;
 }

 
 public int getOthers() {
  return others;
 }

 public void setOthers(int others) {
  this.others = others;
 }
 
 

 
 public String toString() {
  return "web:["+web+"] iphone:["+iphone+"] android:["+android+"] others:["+others+"]";
 }
 
}


And create our own aggregation strategy by adding number to the java bean, base on the "devicetype" header.

package org.blogdemo.twitterdemo;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class SourceAggrateStrategy implements AggregationStrategy {

 
 
 @Override
 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  SourceCounter counter = null;
  
  if (oldExchange == null) {
   counter = new SourceCounter();
        }else{      
         counter = oldExchange.getIn().getBody(SourceCounter.class);
        }
  
  String newType = newExchange.getIn().getHeader("deviceType").toString(); 
  counter.addType(newType);
 
  
  
  newExchange.getIn().setBody(counter);
     //newExchange.getOut().setBody(counter);
     
  return newExchange;
 }
 
 
}



Add the strategy into Camel context in the blueprint.xml,


<bean id="sourceAggrateStrategy" class="org.blogdemo.twitterdemo.SourceAggrateStrategy" />

Setup the aggregator,


And then at the end, we are exporting the result back to Twitter, using the twitter connector again.

    
        
        
            
                true
            
            
                Summarizing the device tweeting every 10 secs from my twitter using Camel, ${body}
            
            
            
        
    

Here is the part two video, it takes you through step by step of building the demo.


You can also find the code and the one click ready in my Github account too!
https://github.com/weimeilin79/twitterdemo

Following the instruction in Github, installing the application onto JBoss Fuse, you can also see the run time detail too!


Thanks!

Wednesday, November 19, 2014

JBoss Fuse - Connecting to Facebook

Got your camel working? now it's time to have some fun, how about update your Facebook status using Camel? From Camel 2.12 onwards, Facebook endpoint was added, which makes it a lot easier to implement.

So how does it works? Basically, Facebook provides a unofficial Java API called "facebook4j", so Camel uses this API to implement the integration with Facebook. 

Basically, Facebook requires OAuth for all client application authentication, please go to the Facebook developer site and register.  



And create a new application, because Facebook require you to set authority to different function for each application. For instance, with application A, you grant the publish and poll right with all your photos, and application B you grant the location information to it. So click on the "Apps" tab on top of the page. 


Choose website, 

Give a name to your application, and choose the type of your application,

Skip or go through the quick start.  After that, you will arrive in your demo dashboard, it has the information required to authenticate your camel accessing Facebook, make sure you store the App ID and App Secret (click on "show" to display), because we are going to use it later.


Let's take the easy way, and use the Graph API explorer tool to generate your token. A token is the key tells Facebook how long your session last and the approval permissions. We are going to use the temporary short-term token. Further reading about the token can be found here.


Choose the application you have created, and click on the Get Access Token,

The permissions pops up, make sure you click on the permission that you what to use. For our case, please click on "publish_actions" in "Extended Permission", and make sure you store the access token somewhere.


And now we are ready to create our first Camel-Facebook application. There are 3 must have parameters, 
  • oAuthAppId - The APP ID from application Dashboard 
  • oAuthAppSecret - The APP Secret from application Dashboard 
  • oAuthAccessToken - The user access token we got from the tool

There are also other none mandatory options, make sure you check the document if needed. 

Like the other camel components, Facebook also have Producer and Consumer too. 

Use Facebook producer when you want to send or perform actions on to Facebook, like posting a status, uploading a photo, leave comments, get account information and adding Tags, etc.. The detail functionality are also available on the document too. 

On the other hand consumer endpoints are responsible for the reading part of the action, we will also play with that in the demo too. 

This is a simple demo, to avoid any privacy issues with your friends, this time we are going to get your birthday from Facebook, calculate how many days left till your next birthday, and then publish it to your Facebook status. 

Make sure you have installed JBDS and the integration plugin.  
Go to JBDS, create a new blueprint Fuse project, enter Group ID and artifact ID. 


Open pom.xml and add camel-facebook dependency. 


Drag the endpoints from palette to canvas, 
Endpoint with uri : 
facebook://me?oAuthAppId={{oAuthAppId}}&oAuthAppSecret={{oAuthAppSecret}}&oAuthAccessToken={{oAuthAccessToken}}&consumer.delay=10000

and log: ${body}



Add the properties file containing facebook authentication settings, the 3 must have parameter mentioned above,  
  • oAuthAppId - The APP ID from application Dashboard 
  • oAuthAppSecret - The APP Secret from application Dashboard 
  • oAuthAccessToken - The user access token we got from the tool


The result returning from Facebook are objects from Facebook4j, simply create a java bean that does the actual business calculation. 

BirthdayCounter.java
package org.blogdemo.fbdemo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import facebook4j.User;

public class BirthdayCounter {

 public int count(User user){
  if(user == null || user.getBirthday() == null){
   return -1;
  }
  
  String birthday = user.getBirthday();
  
     SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy"); 
     SimpleDateFormat yf = new SimpleDateFormat("yyyy");
     Date date = new Date();
          
     int endYear = Integer.parseInt(yf.format(date));
     String birthdayTempString = birthday.substring(0, birthday.length()-4)+endYear;
     
     Date birthdayThisYear = null;
     Date birthdayNextYear = null;
    
     try {
      birthdayThisYear = sdf.parse(birthdayTempString);
  } catch (ParseException e) {
   e.printStackTrace();
  }
     
     //Determine we should calculate from this year or next year
     if(birthdayTempString.equals(sdf.format(date))){
      return 0;
     }else if(birthdayThisYear.before(date)){
      birthdayTempString = birthday.substring(0, birthday.length()-4) + (endYear+1);
      try {
    birthdayNextYear = sdf.parse(birthdayTempString);
   } catch (ParseException e) {
    e.printStackTrace();
   }
     }else{
      birthdayTempString = birthday.substring(0, birthday.length()-4) + (endYear-1);
      birthdayNextYear = birthdayThisYear;
     }
     
  
     int dayDiff = daysBetween(date,birthdayNextYear);
  return dayDiff;
 }
 
 private int daysBetween(Date d1, Date d2){
  return (int)( (d2.getTime() - d1.getTime()) / (1000 * 60 * 60 * 24));
 }
 
}

And create another bean that converts String of text to another Facebook4j object, because we are going to publish a status feed to Facebook, and it will need to use the Facebook4j.postUpdate as the input body. 

FBUpdator.java
package org.blogdemo.fbdemo;

import facebook4j.PostUpdate;

public class FBUpdator {
 
 PostUpdate postUpdate = null;
 public PostUpdate update(String test){
  postUpdate = new PostUpdate(test);
  
  return postUpdate;
 }
}

Add two beans to the camel context by referencing it in the blueprint xml. 

  <bean id="birthdayCounter" class="org.blogdemo.fbreader.BirthdayCounter"/>
 <bean id="fbUpdator" class="org.blogdemo.fbreader.FBUpdator"/>

Now, complete the rest of the route. 


  
    
    
        
        
        
        
            
                ${body} < 0
                
                    I am a very mysterious person
                
            
            
                ${body} == 0
                
                    It's My BIRTHDAY!!!!
                
            
            
             
                    Counting down... ${body} more days to my birthday
                              
            
        
        
            
        
        
    


Run the camel route, and you will see the feed posted in Facebook home!


I have break the demo into 2 videos, 

First one shows you how to setup Facebook App and poll data from Facebook.


Second one shows you how to process and publish feed to Facebook status.


You can also find the code and the one click ready in my Github account too!