Saturday, December 20, 2014

JBoss Fuse - File Connector with Split Pattern and JDBC connector with Error Handling - Part Two

This is the second part of the demo, "JDBC connector with Error Handling ", please take a look at the previous blog post. You can also find the finished code base here.

In the XML file, you'll find there is a VIP status, this indicates the process fee we need to charge per transaction, for DIAMOND class VIP, it's $1, $2 for GOLD and $3 for other accounts.

Using JDBC Connector, we first must create datasource setting, either within Camel context xml or in Fabric8. To make testing easy, we will set the setting in the datasource.

XML
 	
    	
    	
    	
  		
  	 
  
  	
		
  	
  
	
		
		
	
Here you will see I use the H2 Database in memory, I am using it because it's easier to setup, without the need of installing a database. If you want, you can always change it to your desire database, all you have to do is just replace the driverClassName and url, along with the ID and password to access the database. And then it's always a good idea to include an transaction manager so we can set the transaction policy. 

Before calling the jdbc component, jdbc:dataSourcePS, remember to set SQL to execute into the body. 

<setBody>
  <simple>SELECT balance from customerdemo where customerID = '${header.CustId}';</simple>
</setBody>
<to uri="jdbc:dataSourcePS?resetAutoCommit=true"/>

And we might want to capture and do something when there were error. Same as in Java programming we can specify exception policies to use and can then intercept and handle the exceptions. We can specify it by using the on Exception. After capturing the exception, we can either process it or redeliver the route again. Here is an example we use in the demo. 

    
        java.lang.Exception
        
        
            true
        
        
    

From last demo, we have process and split the XML from different bank branches into individual separate transaction. And they are group into 2 different groups, cash and transfer transaction.There are no fees when the transaction is in cash, but for Transfer, you'll find there are VIP status, this indicates the process fee we need to charge, for DIAMOND class VIP, it's $1, $2 for GOLD and $3 for other accounts.

To update the balance in database first we select to make sure we have enough balance to deduct or we will throw a not enough balance exception, which we will handle with a logging it.





Carry on from our last demo, we are going to extend the two camel route.
i) processCash





    
        
        
            SELECT balance from customerdemo where customerID = '${header.CustId}';
        
        
        
        
            UPDATE customerdemo SET balance = ${body} where customerID = '${header.CustId}';
        
        
    

Within the route you will see a java bean, it is the actual business logic to add and deduct the money from each account.


package org.blogdemo.bankdemo;

import java.util.ArrayList;
import java.util.HashMap;

import org.apache.camel.Exchange;

public class BankBean {
	int countError=0;
	
	public static final int DIAMOND_FEE = 1;
	public static final int GOLD_FEE = 2;
	public static final int NORMAL_FEE = 3;
	
	public void transfer(Exchange oldExchange, Exchange newExchange)throws Exception{
		Integer amtDeduct = ((Integer)newExchange.getIn().getHeader("amt",Integer.class));
		newExchange.getIn().setHeader("amt", amtDeduct * -1);
	}
	
	public void doBalance(Exchange oldExchange, Exchange newExchange)throws Exception{
		
		ArrayList> balanceList =(ArrayList>) newExchange.getIn().getBody(ArrayList.class);	
		HashMap result = balanceList.get(0);
		Integer newBalance = ((Integer)result.get("BALANCE")) +((Integer)newExchange.getIn().getHeader("amt",Integer.class));
		
		if("Diamond".equalsIgnoreCase(newExchange.getIn().getHeader("amt",String.class))){
			newBalance -= DIAMOND_FEE;
		}else if("Gold".equalsIgnoreCase(newExchange.getIn().getHeader("amt",String.class))){
			newBalance -= GOLD_FEE;
		}else{
			newBalance -= NORMAL_FEE;
		}
		
		if(newBalance < 0){
			throw new Exception("NOT ENOUGH BALANCE");
		}
		
		newExchange.getIn().setBody(newBalance);
		System.out.println( "--->newBalance :["+newBalance+"]" );
	} 
	
	public void doBalanceWithoutFee(Exchange oldExchange, Exchange newExchange)throws Exception{
		
		ArrayList> balanceList =(ArrayList>) newExchange.getIn().getBody(ArrayList.class);	
		HashMap result = balanceList.get(0);
		Integer newBalance = ((Integer)result.get("BALANCE")) +((Integer)newExchange.getIn().getHeader("amt",Integer.class));
		
		
		if(newBalance < 0){
			throw new Exception("NOT ENOUGH BALANCE");
		}
		
		newExchange.getIn().setBody(newBalance);
		System.out.println( "--->newBalance :["+newBalance+"] countError:["+countError+"]" );
		
	} 	
	

}


ii) doTransfer







This route is very similar with last one, but since transfer occurs between 2 accounts, so there are more select and update works to be done. Also within the route you will see a java bean, it is the actual business logic to calculate the how much to add or deduct and determine how much rate bank should charge.


    
        
        
            SELECT balance from customerdemo where customerID = '${header.CustId}';
        
        
        
        
        
            UPDATE customerdemo SET balance = ${body} where customerID = '${header.CustId}';
        
        
            SELECT balance from customerdemo where customerID = '${header.receiverId}';
        
        
        
        
        
            ${header.transfersql}  UPDATE customerdemo SET balance = ${body} where customerID = '${header.receiverId}';
        
        
    

Lastly, because we are using tables in memory, so there are some preparation needs to be done beforehand. Here is a very simple setup of creating tables and accounts in database. Also a timer that display all the account within database in log.


    
        
        
            CREATE TABLE customerdemo (customerID character varying(10) NOT NULL,vipStatus character varying(10) NOT NULL ,  balance integer NOT NULL);
        
        
    
    
        
        
            INSERT INTO customerdemo (customerID,vipStatus,balance) VALUES ('A01','Diamond',1000);
            INSERT INTO customerdemo (customerID,vipStatus,balance) VALUES ('A02','Gold',500); 
        
        
    
    
        
        
            select * from customerdemo
        
        
        
    


Here is the follow up video, please take a look at the part 1 if you have not see before. 


We can also deploy this example onto Fuse Fabric , please go to my code repo, and find the installation script, and run init.sh. 

To run the demo, in browser enter http://localhost:8181 and login with ID/PWD of admin/adminUnder Runtime, you will see list of containers, and click on the small icon on the righthand-side of the testcon container Fabric listInside the Container, under Camel tab, you will see the list of routes we have. Container Route ListClick on Endpoint on the lefthand-side, choose the file endpoint, and send the xml. Containter Endpoint SendYou will see the transfer result in the log.Container Log

Enjoy! 


Monday, December 1, 2014

JBoss Fuse - File Connector with Split Pattern and JDBC connector with Error Handling - Part One

This demo is slightly longer then the ones I done before, So I will break it into 2 parts, and try to keep each section as simple as possible, and then combine 2 simple section into a bigger demo. During the process I hope you can find it's very simple to integrate, process data and information with JBoss Fuse.

Part One
=========

  • File connector
  • Split Pattern


We are going to take in XML file from different branch in a directory, each contains cash deposit, cash withdraw and transfer data, depending on the type of transaction, we spilt this XML file into different sections, then send them to different services for further process.

The format of a XML looks like this. 

 
  A02
  Gold
  
   -20
  
 
 
  A02
  Gold
  
   A01
   20
  
 




First create a route that has a file consumer endpoint that read xml files from directory, there are many different options like, readLock - locks the file when reading it , recursive -look for files in all the sub-directories as well , extra.. for more options please read the documentation. After retrieving the xml file, we are going to divide the file into different part using the Spilt pattern in Camel. The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually.


For split, we will need to specify how the message needs to be separated.  In our case, we are going to use the XPATH to indicate how the xml file is divided. (XPath is a language for addressing parts of an XML document). From our XML. file there are multiple transactions under the root element Bank, therefore the XPATH will be "//Bank/Transaction" then we send each individual xml parts to another route to process. 

    
        
        
            //Bank/Transaction
            
        
    

For each individual transaction, we get the data using XPath and set them in header, and for different types of transaction, Cash and Transfer, we send it to different route for further process.
    
        
        
            /Transaction/CustId/text()
        
        
            /Transaction/VipStatus/text()
        
        
            /Transaction/Detail/amount/text()
        
        
            ${body}
        
        
            
                /Transaction[@type='Cash']
                
            
            
                /Transaction[@type='Transfer']
                
                    /Transaction/Detail/CustId/text()
                
                
            
        
    

Lastly, for the 2 processing routes, is to export each transaction into a single xml file. So we need the file component again. This time file component is a Producer, for producer, we have fileExist- that overrides the file when exist. etc. 



    
        
        
    
    
        
        
    

If we test and place the branch-xxx.xml file into the inputdir, it will process the xml and split the file into different small pieces. 

Here is the video that shows you how to create the project. 
Demo code will be out with part 2. 



Tuesday, November 25, 2014

JBoss Fuse - Connecting to Twitter and the Aggregator pattern

If you think connecting to Facebook is easy with Camel, try connecting to Twitter! It's every easier!
Before connecting to Twitter, make sure you have register a twitter developer account, and go to https://apps.twitter.com/. Create a new Twitter App.

There are a few mandatory parameter for authentication if you want to play with Twitter, they are :
  • consumerKey - The consumer key
  • consumerSecret - The consumer secret 
  • accessToken - The access token
  • accessTokenSecret - The access token secret

Also Twitter has Consumer  as it handle mostly polling functionality, such as search from content, streaming tweets from twitter etc.

  • timeline/home - read or polls tweet from user home
  • search - search specific keywords,
  • streaming/filter - get streaming of data from twitter.

Whereas Producer handles tweeting and calling of twitter API, they might also include searching and timeline polling too.

  • directmessage - gets the message 
  • search - search for keywords
  • timeline/user - polling from users
For more detail please visit the product documentation

Note, currently Twitter API is in version V1.1, using the JBoss Fuse 6.1 should work without a problem. since it's base on Twitter4j V3.x

To play with Camel-Twitter connector.
First, we are going to create a twitter app(make sure you have already register an developer account for Twitter), and then go to https://apps.twitter.com/ 


Provide details of the app, then you will come to the application console, click on (modify app permissions) so we can read and publish into our account.

Click on Generate My Access Token and Token secret to get the authentication keys under Your Access Token.


Save both customer and access token and token secret somewhere handy, because we are going to need them later. Make sure you have installed JBDS and the integration plugin.
Go to JBDS, create a new blueprint Fuse project, enter Group ID and artifact ID.


Open pom.xml and add camel-facebook dependency.

Drag the endpoints from palette to canvas,
Endpoint with uri :

twitter://timeline/home?type=polling&amp;delay=10&amp;consumerKey={{consumerKey}}&amp;consumerSecret={{consumerSecret}}&amp;accessToken={{accessToken}}&amp;accessTokenSecret={{accessTokenSecret}}

and log: ${body.source}

Add the properties file containing twitter authentication settings, the 4 must have parameter mentioned above, and fill them in by mapping the tokens you get from creating the Twitter App.
  • consumerKey - The consumer key
  • consumerSecret - The consumer secret 
  • accessToken - The access token
  • accessTokenSecret - The access token secret
Map the properties file into Camel Context by using "propertyPlaceholder"

<propertyPlaceholder location="classpath:twitter.properties" id="twitter"/>

So you can start test the first part of the Twitter Camel Example. Related videos can be found here:


Now, going back to our demo. In part 2, we are going to poll tweets every 10 secs from your personal timeline, and analysis the source device of the tweets, and make a summary of how many time each device was use and tweets the result back to Twitter!

From part one, we have already polling tweets from twitter, now to further process the content, add a content switch to the route, that will add 4 different header content to the header bane "devicetype" and call another route for further process.

    
        
        
        
            
                ${body.source} == "Web Client"
                
                    Web
                
            
            
                ${body.source} == "iPhone"
                
                    iPhone
                
            
            
                ${body.source} == "Android"
                
                    Android
                
            
            
                
                    Others
                
            
        
        
    


Notice we are using "seda" to pass the route, seda. The seda: component provides asynchronous SEDA behavior, so that messages are exchanged on a BlockingQueue and consumers are invoked in a separate thread from the producer. We do that because the twitter connector creates one route exchange per returned object, instead of returning a list of tweets, and in our example, the order of process does not matter.

With all the different tweets coming in separately, we now want to summarize the total device, that's when the EIP Aggregator becomes very handy. The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message. In this component, we can implement our own aggregation strategy, and choose different ways when to stop the aggregation, like aggregate every 10 seconds, aggregate every 5 messages, or after certain sizes or even end in some predicted condition.  (For more details, please read the product document). In our demo, we are aggregating every 10 seconds.

So create a java bean that holds the device data (Web, iPhone, Android and Others)
package org.blogdemo.twitterdemo;

public class SourceCounter {
 
 
 private int web = 0;
 private int android = 0;
 private int iphone = 0;
 private int others = 0;

 public void addType(String type){
  if("Web".equals(type))
   web ++;
  if("Android".equals(type))
   android++;
  if("iPhone".equals(type))
   iphone++;
  if("Others".equals(type))
   others++;
 }
 
 public void addWeb() {
  web++;
 }
 
 public void addAndroid() {
  android++;
 }
 
 public void addIphone() {
  iphone++;
 }
 
 public void addOthers() {
  others++;
 }
 
 
 
 
 
 public int getWeb() {
  return web;
 }
 
 public void setWeb(int web) {
  this.web = web;
 }
 public int getAndroid() {
  return android;
 }

 
 public void setAndroid(int android) {
  this.android = android;
 }
 public int getIphone() {
  return iphone;
 }
 public void setIphone(int iphone) {
  this.iphone = iphone;
 }

 
 public int getOthers() {
  return others;
 }

 public void setOthers(int others) {
  this.others = others;
 }
 
 

 
 public String toString() {
  return "web:["+web+"] iphone:["+iphone+"] android:["+android+"] others:["+others+"]";
 }
 
}


And create our own aggregation strategy by adding number to the java bean, base on the "devicetype" header.

package org.blogdemo.twitterdemo;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;

public class SourceAggrateStrategy implements AggregationStrategy {

 
 
 @Override
 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  SourceCounter counter = null;
  
  if (oldExchange == null) {
   counter = new SourceCounter();
        }else{      
         counter = oldExchange.getIn().getBody(SourceCounter.class);
        }
  
  String newType = newExchange.getIn().getHeader("deviceType").toString(); 
  counter.addType(newType);
 
  
  
  newExchange.getIn().setBody(counter);
     //newExchange.getOut().setBody(counter);
     
  return newExchange;
 }
 
 
}



Add the strategy into Camel context in the blueprint.xml,


<bean id="sourceAggrateStrategy" class="org.blogdemo.twitterdemo.SourceAggrateStrategy" />

Setup the aggregator,


And then at the end, we are exporting the result back to Twitter, using the twitter connector again.

    
        
        
            
                true
            
            
                Summarizing the device tweeting every 10 secs from my twitter using Camel, ${body}
            
            
            
        
    

Here is the part two video, it takes you through step by step of building the demo.


You can also find the code and the one click ready in my Github account too!
https://github.com/weimeilin79/twitterdemo

Following the instruction in Github, installing the application onto JBoss Fuse, you can also see the run time detail too!


Thanks!

Wednesday, November 19, 2014

JBoss Fuse - Connecting to Facebook

Got your camel working? now it's time to have some fun, how about update your Facebook status using Camel? From Camel 2.12 onwards, Facebook endpoint was added, which makes it a lot easier to implement.

So how does it works? Basically, Facebook provides a unofficial Java API called "facebook4j", so Camel uses this API to implement the integration with Facebook. 

Basically, Facebook requires OAuth for all client application authentication, please go to the Facebook developer site and register.  



And create a new application, because Facebook require you to set authority to different function for each application. For instance, with application A, you grant the publish and poll right with all your photos, and application B you grant the location information to it. So click on the "Apps" tab on top of the page. 


Choose website, 

Give a name to your application, and choose the type of your application,

Skip or go through the quick start.  After that, you will arrive in your demo dashboard, it has the information required to authenticate your camel accessing Facebook, make sure you store the App ID and App Secret (click on "show" to display), because we are going to use it later.


Let's take the easy way, and use the Graph API explorer tool to generate your token. A token is the key tells Facebook how long your session last and the approval permissions. We are going to use the temporary short-term token. Further reading about the token can be found here.


Choose the application you have created, and click on the Get Access Token,

The permissions pops up, make sure you click on the permission that you what to use. For our case, please click on "publish_actions" in "Extended Permission", and make sure you store the access token somewhere.


And now we are ready to create our first Camel-Facebook application. There are 3 must have parameters, 
  • oAuthAppId - The APP ID from application Dashboard 
  • oAuthAppSecret - The APP Secret from application Dashboard 
  • oAuthAccessToken - The user access token we got from the tool

There are also other none mandatory options, make sure you check the document if needed. 

Like the other camel components, Facebook also have Producer and Consumer too. 

Use Facebook producer when you want to send or perform actions on to Facebook, like posting a status, uploading a photo, leave comments, get account information and adding Tags, etc.. The detail functionality are also available on the document too. 

On the other hand consumer endpoints are responsible for the reading part of the action, we will also play with that in the demo too. 

This is a simple demo, to avoid any privacy issues with your friends, this time we are going to get your birthday from Facebook, calculate how many days left till your next birthday, and then publish it to your Facebook status. 

Make sure you have installed JBDS and the integration plugin.  
Go to JBDS, create a new blueprint Fuse project, enter Group ID and artifact ID. 


Open pom.xml and add camel-facebook dependency. 


Drag the endpoints from palette to canvas, 
Endpoint with uri : 
facebook://me?oAuthAppId={{oAuthAppId}}&oAuthAppSecret={{oAuthAppSecret}}&oAuthAccessToken={{oAuthAccessToken}}&consumer.delay=10000

and log: ${body}



Add the properties file containing facebook authentication settings, the 3 must have parameter mentioned above,  
  • oAuthAppId - The APP ID from application Dashboard 
  • oAuthAppSecret - The APP Secret from application Dashboard 
  • oAuthAccessToken - The user access token we got from the tool


The result returning from Facebook are objects from Facebook4j, simply create a java bean that does the actual business calculation. 

BirthdayCounter.java
package org.blogdemo.fbdemo;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import facebook4j.User;

public class BirthdayCounter {

 public int count(User user){
  if(user == null || user.getBirthday() == null){
   return -1;
  }
  
  String birthday = user.getBirthday();
  
     SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy"); 
     SimpleDateFormat yf = new SimpleDateFormat("yyyy");
     Date date = new Date();
          
     int endYear = Integer.parseInt(yf.format(date));
     String birthdayTempString = birthday.substring(0, birthday.length()-4)+endYear;
     
     Date birthdayThisYear = null;
     Date birthdayNextYear = null;
    
     try {
      birthdayThisYear = sdf.parse(birthdayTempString);
  } catch (ParseException e) {
   e.printStackTrace();
  }
     
     //Determine we should calculate from this year or next year
     if(birthdayTempString.equals(sdf.format(date))){
      return 0;
     }else if(birthdayThisYear.before(date)){
      birthdayTempString = birthday.substring(0, birthday.length()-4) + (endYear+1);
      try {
    birthdayNextYear = sdf.parse(birthdayTempString);
   } catch (ParseException e) {
    e.printStackTrace();
   }
     }else{
      birthdayTempString = birthday.substring(0, birthday.length()-4) + (endYear-1);
      birthdayNextYear = birthdayThisYear;
     }
     
  
     int dayDiff = daysBetween(date,birthdayNextYear);
  return dayDiff;
 }
 
 private int daysBetween(Date d1, Date d2){
  return (int)( (d2.getTime() - d1.getTime()) / (1000 * 60 * 60 * 24));
 }
 
}

And create another bean that converts String of text to another Facebook4j object, because we are going to publish a status feed to Facebook, and it will need to use the Facebook4j.postUpdate as the input body. 

FBUpdator.java
package org.blogdemo.fbdemo;

import facebook4j.PostUpdate;

public class FBUpdator {
 
 PostUpdate postUpdate = null;
 public PostUpdate update(String test){
  postUpdate = new PostUpdate(test);
  
  return postUpdate;
 }
}

Add two beans to the camel context by referencing it in the blueprint xml. 

  <bean id="birthdayCounter" class="org.blogdemo.fbreader.BirthdayCounter"/>
 <bean id="fbUpdator" class="org.blogdemo.fbreader.FBUpdator"/>

Now, complete the rest of the route. 


  
    
    
        
        
        
        
            
                ${body} < 0
                
                    I am a very mysterious person
                
            
            
                ${body} == 0
                
                    It's My BIRTHDAY!!!!
                
            
            
             
                    Counting down... ${body} more days to my birthday
                              
            
        
        
            
        
        
    


Run the camel route, and you will see the feed posted in Facebook home!


I have break the demo into 2 videos, 

First one shows you how to setup Facebook App and poll data from Facebook.


Second one shows you how to process and publish feed to Facebook status.


You can also find the code and the one click ready in my Github account too! 


Wednesday, November 5, 2014

JBoss Fuse/A-MQ - Playing with Insight/Elastic Search Tech Preview of Fuse

This week in Berlin, I met awesome Red Hat SA Roel Hodzelmans , who showed me the "fun stuff" that you can play with in JBoss Fuse, although this is still a Tech Preview function, I think would be great for people to take play with this and give as much testing as possible.

Kibana is an open source browser based analytics and search dashboard for ElasticSearch.

Steps to do an Insight/Elastic Search demo of Fuse, make sure the environment has enough disk space.

1) In the root container add the Kibana profile - this is the management console for Elastic Search.


It will add a new tab next to Runtime, Wiki, Dashboard, Health and Insight. The preparation takes a couple of minutes, depends on your resource.



2) Add the insight core profile to all containers where you want to capture the logging from




3) Create a new profile or profile version to which you want to add insight in the camel parts to,



then add insight-camel feature to the features of the profile




4) Migrate a container to the new version or create a target container with the new version and add the newly created profile to the container in question



Once you have it install and your application running, you could see 5 different views:

Logs- where you get insight in the log statements


Metric, this should be have data on your JVM etc, but there are still work to be done here


Camel Events shows all Camel exchanges, note the Exchange ID is also posted here

Camel shows a text box into which you can put an Exchange ID. When succesfull (no whitepaces, wildcards or partial strings!) it will draw a sequence diagram detailing the time spend at all components. If your route had good ID's for the endpoints, it will display those names else you get generated id's, making it hard to read

ESHead shows you the elastic search component itself, in which the data is stored and you can
    a) see the cluster health,
    b) make your own queries...etc


You can follow Roel on Twitter @RoelHodzelmans