Wednesday, March 26, 2014

WSO2Asia Con - Tutorial -"Advancing Integration Competency and Excellence with the WSO2 Integration Platform" Presentation Slides

You can find the presentation we (Me/Dushan) did for the tutorial session "Advancing Integration Competency and Excellence with the WSO2 Integration Platform"

Sunday, October 20, 2013

Custom mediator for WSO2 ESB

There are some situations in our deployment scenarios, which it is required to do a custom transformation of incoming messages. This custom tasks are sometimes can not be achieved with the inbuilt mediators in WSO2 Enterprise Service Bus. So , in those kind of scenarios , we ll have to write a custom mediator to achieve this. With this post , I am explaining how to write a custom mediator for WSO2 ESB.

Writing a custom mediator for ESB , means we are writing a class meditor for ESB which contains the required logic in the mediate() method of the class. When writing a class mediator, there is  a basic step follow to be compatible to deploy in WSO2 ESB.  We need to extend our mediator from the abstract class "org.apache.synapse.mediators.AbstractMediator". In order to do that , you will have to add a dependency to your project for  synapse-core as follows;



    <dependencies>
        <dependency>
            <groupId>org.apache.synapse</groupId>
            <artifactId>synapse-core</artifactId>
            <version>${synpase.core.version}</version>
        </dependency>
    </dependencies>


Before digging in to the code it self, let me explain , what i am going to achieve with this custom mediator.

I have following incoming message ;


         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>,CMB,GM,KND,</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>

 

From this incoming message , i need to generate messages with the same type by splitting the comma separated string passed with the local name "preferred city code". After generating , it should be like follows,




         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>CMB</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>



         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>GM</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>



         <dat:addJobWorkLocation xmlns:dat="http://ws.wso2.org/dataservice">
            <dat:job_id>2037826</dat:job_id>
            <dat:preferred_city_code>KND</dat:preferred_city_code>
            <dat:country_code>SL</dat:country_code>
         </dat:addJobWorkLocation>






Since we do not have an inbuilt mediator to fulfill this task, i am writing a custom mediator achieve that.
Name of my custom mediator is "SmartSplitMediator" and i am extending it from the AbstractMediator as follows.

public class SmartSplitMediator extends AbstractMediator {
...
}


Once i extended my class with AbstractMediator,  it is essential to implement the methods in the AbstractMediator class. So , i am implementing the method mediate in this class as follows.


public class SmartSplitMediator extends AbstractMediator {
...


   public boolean mediate(MessageContext messageContext) {
   ...
  }


...
}


Then i need to write my logic inside this mediate method. To achieve my target , i need to pass some variables in this class. So, for that task , i need to define those variables as class variables and add getters and setters for those variables. For this particular case, i need following variables.

  • parentLocalName
  • parentNamespace
  • parentNamespacePrefix
  • IteratingElementLocalName
  • variableStringLocalName
  • constantStringsLocalNames


I am adding these variables to my class as follows;


public class SmartSplitMediator extends AbstractMediator {

    private String parentLocalName;
    private String parentNamespace;
    private String parentNamespacePrefix;
    private String IteratingElementLocalName;
    private String variableStringLocalName;
    private String constantStringsLocalNames;
  
    public boolean mediate(MessageContext messageContext) {
        ...
    }

    public String getParentLocalName() {
        return parentLocalName;
    }

    public void setParentLocalName(String parentLocalName) {
        this.parentLocalName = parentLocalName;
    }

    public String getParentNamespace() {
        return parentNamespace;
    }

    public void setParentNamespace(String parentNamespace) {
        this.parentNamespace = parentNamespace;
    }

    public String getParentNamespacePrefix() {
        return parentNamespacePrefix;
    }

    public void setParentNamespacePrefix(String parentNamespacePrefix) {
        this.parentNamespacePrefix = parentNamespacePrefix;
    }

    public String getIteratingElementLocalName() {
        return IteratingElementLocalName;
    }

    public void setIteratingElementLocalName(String iteratingElementLocalName) {
        IteratingElementLocalName = iteratingElementLocalName;
    }

    public String getVariableStringLocalName() {
        return variableStringLocalName;
    }

    public void setVariableStringLocalName(String variableStringLocalName) {
        this.variableStringLocalName = variableStringLocalName;
    }

    public String getConstantStringsLocalNames() {
        return constantStringsLocalNames;
    }

    public void setConstantStringsLocalNames(String constantStringsLocalNames) {
        this.constantStringsLocalNames = constantStringsLocalNames;
    }
}



Then the remaining part is to write the logic inside the mediate method. With the above passed in variables i have the following logic inside my mediate method.




    public boolean mediate(MessageContext messageContext) {

        SOAPFactory fac = OMAbstractFactory.getSOAP11Factory();
        SOAPBody soapBody = messageContext.getEnvelope().getBody();
        OMElement parentElement = (OMElement) soapBody.getFirstElement();
        String variableString = parentElement.getFirstChildWithName(new QName(getParentNamespace(),getVariableStringLocalName(),getParentNamespacePrefix())).getText();
      
//Here we are tokenizing the incoming string from commas
  StringTokenizer variableTokenizer = new StringTokenizer(variableString,",");

        OMElement newParentElement = fac.createOMElement(new QName(getParentNamespace(), getParentLocalName(), getParentNamespacePrefix()));
        while (variableTokenizer.hasMoreTokens()) {
            String variableValue = variableTokenizer.nextToken();
            if (variableValue.length() > 0) {
                OMElement secondLevelElement = fac.createOMElement(new QName(getParentNamespace(), getIteratingElementLocalName(), getParentNamespacePrefix()));
                StringTokenizer tokenizer = new StringTokenizer(getConstantStringsLocalNames(), ",");
                while (tokenizer.hasMoreTokens()) {
                    String constantLocalName = tokenizer.nextToken();
                    OMElement jobIdElement;
                    if (constantLocalName.equalsIgnoreCase(getVariableStringLocalName())) {
                        jobIdElement = fac.createOMElement(new QName(getParentNamespace(), constantLocalName, getParentNamespacePrefix()));
                        jobIdElement.setText(variableValue);
                    } else {
                        jobIdElement = fac.createOMElement(new QName(getParentNamespace(), constantLocalName, getParentNamespacePrefix()));
                        jobIdElement.setText(parentElement.getFirstChildWithName(new QName(getParentNamespace(),constantLocalName,getParentNamespacePrefix())).getText());
                    }
                    secondLevelElement.addChild(jobIdElement);
                }
                newParentElement.addChild(secondLevelElement);
            }
        }
//Now we have completed the preparation of the new body, We need to detach the existing body and attach the new body. We are doing it here.
        SOAPBody body = messageContext.getEnvelope().getBody();
        if (body.getFirstElement() != null) {
            body.getFirstElement().detach();
        }
        body.addChild(newParentElement);

        return true;
    }




After completing the logic in the above method we are done in implementation. Now we need to compile this mediator and deploy it in the  wso2esb-4.x.x/repository components/lib directory to be used in any of the sequences.

Once you deploy it in the above directory , you can use it as follows in any of the sequence;


     <inSequence>
            <log level="full"/>
            <class name="com.js.mediator.split.SmartSplitMediator">
               <property name="variableStringLocalName" value="preferred_city_code"/>
               <property name="parentLocalName" value="WorkAuthorization"/>
               <property name="constantStringsLocalNames" value="preferred_city_code,
country_code"/>

               <property name="parentNamespacePrefix" value="dat"/>
               <property name="parentNamespace" value="http://ws.wso2.org/dataservice"/>
               <property name="IteratingElementLocalName" value="addJobWorkLocation"/>
            </class>
            <log level="full"/>
            <iterate xmlns:dat="http://ws.wso2.org/dataservice"
                     id="foo"
                     expression="//dat:WorkAuthorization/dat:addJobWorkLocation"
                     sequential="true">
               <target>
                  <sequence>
                     <log level="full"/>
                     <drop/>
                  </sequence>
               </target>
            </iterate>
         </inSequence>   

   

Here i am attaching following items with related to this blog post.



Monday, September 23, 2013

How to detect CPU loading issues with JTOP , Jconsole plugin


Why we need JTOP plugin ?

With JConsole , we can detect that there are high CPU load. But it does not tell what are the possible root causes for this CPU load. So , with JTOP plugin , it lists all the top level CPU usages in the process.

How to use JTOP plugin with JConsole ?

When starting the JConsole, you need to give the path to the JTop plugin. 

Eg: My JavaHome is : /opt/software/jdk1.6.0_29

$jconsole -pluginpath /opt/software/jdk1.6.0_29/demo/management/JTop/JTop.jar

General way to start the JConsole with JTop is described in Evanthika's Blog.

Why i wanted to use JTOP?

When i was testing my server, i experienced a high CPU usage with that server. So i wanted to isolate the issue i have in that server. So i started the JConsole with JTop plugin and my out put was as in following image.




With that i could find that , CPU is highly consumed by these threads. So when i looked in to the thread with "Thread" tab in JConsole, i could identify the root cause for the problem.




So, to identify the root cause of the CPU overloading issue, JTOP plugin helped me a lot.

Monday, July 29, 2013

WSO2 Data Services Server (DSS) not consuming messages from JMS Queues

As we know , we can integrate wso2 data services server with wso2 message broker , in a way that Data services server consumes messages from the queues of the Message  broker.

When it comes to large scale like, more than 100 data services consume messages from Message broker, it will throw following exception and will not start to consume messages from queues.

WARN {org.apache.axis2.transport.jms.JMSListener} -  Polling tasks on destination : xxx of type queue for service xxx have not yet started after 3 seconds ..


The reason behind this warning and the situation is, controlled limitation of the system. In Data Services   server, it has been limited the number of threads which consume messages from jms queues from the implementation to 100.

It has been done by adding a configurable parameter to the start up script ( wso2server.sh) of the server. By default the value of the parameter snd_t_core=100. If any one getting above exception in a situation like more than 100 consumers, he can increase this parameter to overcome that situation as highlighted bellow;



    -Dcom.sun.jndi.ldap.connect.pool.authentication=simple  \
    -Dcom.sun.jndi.ldap.connect.pool.timeout=3000  \
    -Dorg.terracotta.quartz.skipUpdateCheck=true \
    -Dsnd_t_core=200 \
    -Dsnd_t_max=250 \
    -Djava.security.egd=file:/dev/./urandom \


Tuesday, July 23, 2013

How to start SimpleAxis2Server in debug mode..



In most of the wso2 products , we could find the simple axis2 server shipped in it's samples directory. I had a situation which i wanted to debug an axis2 service deployed in this simple axis2 server.


Yes, It is very simple thing to start the simple axis2 server in debug mode..... But i had to dig in to the start up script to find out how to do that.

we can start the simple axis2 server in debug  mode with following:

sh axis2server.sh  -xdebug

it will start the server in debug mode with listening to the port 8000

Tuesday, February 19, 2013

UnsupportedOperationException in Creating Topics in WSO2 Message Broker


Some times we are getting the following error when try to create topics in WSO2 Message Broker.


Exception in thread "main" java.lang.UnsupportedOperationException: The new addressing based sytanx is not supported for AMQP 0-8/0-9 versions

at org.wso2.andes.client.AMQSession_0_8.handleAddressBasedDestination(AMQSession_0_8.java:572)
at org.wso2.andes.client.AMQSession.registerConsumer(AMQSession.java:2838)
at org.wso2.andes.client.AMQSession.access$500(AMQSession.java:117)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:2031)
at org.wso2.andes.client.AMQSession$4.execute(AMQSession.java:1997)
at org.wso2.andes.client.AMQConnectionDelegate_8_0.executeRetrySupport(AMQConnectionDelegate_8_0.java:305)
at org.wso2.andes.client.AMQConnection.executeRetrySupport(AMQConnection.java:621)
at org.wso2.andes.client.failover.FailoverRetrySupport.execute(FailoverRetrySupport.java:102)
at org.wso2.andes.client.AMQSession.createConsumerImpl(AMQSession.java:1995)
at org.wso2.andes.client.AMQSession.createExclusiveConsumer(AMQSession.java:976)
at org.wso2.andes.client.AMQSession.createSubscriber(AMQSession.java:1443)
at org.wso2.andes.client.AMQTopicSessionAdaptor.createSubscriber(AMQTopicSessionAdaptor.java:63)
at Subscriber.subscribe(Subscriber.java:52)





The reason for above exception is an implementation limitation. As we know wso2 Message Broker
is supporting AMQP 0-91 Specification and the core of the wso2 Message Broker is Andes which uses the apache-qpid in transport level communication.

In Andes implementation , there is a limitation that, it does not support dynamic queues or topics. If some one needs to use dynamic topics or queues , there is a solution for that.

Normally We create a topic as follows:


session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);

But if we do this with wso2 Message broker, we get the above exception. 

So we can get rid of this by adding the "BURL" syntax, before the name of the topic. It is as


session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("BURL:"+topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);




Wednesday, December 19, 2012

How to use WSO2 ESB VFS transport to transfer large files..


We can use VFS transport in WSO2 ESB as described in the sample [1]. Apart from that Supun which was a project manger of WSO2 ESB has written and article[2] on that. These articles and samples provides a great help on using VFS transport to transfer files.

But when using that configurations for transferring large files (Greater that 500 MB), i got an Out Of Memory exceptions[3] and some other errors[4].

When looking in to the problem more deeply, i could found the solution for the problem. The issue was with the message builder class that i have used and missing a property. We need to use the message builder "org.apache.axis2.format.BinaryBuilder" for this. Apart from that  we need to include the property "ClientApiNonBlocking" in the proxy configuration.

As a sample i have defined a custom content type "chs/binary".

Entries in axis2.xml


<messageFormatters>
        ...    
        <messageFormatter contentType="chs/binary"
        class="org.wso2.carbon.relay.ExpandingMessageFormatter"/>
        ...
</messageFormatters>
    <messageBuilders>
        ...
        <messageBuilder contentType="chs/binary"
        class="org.apache.axis2.format.BinaryBuilder"/>
        ...
   </messageBuilders>

Then we need to have a sample VFS proxy as :

 <proxy xmlns="http://ws.apache.org/ns/synapse"
       name="FileProxy"
       transports="vfs"
       startOnLoad="true"
       trace="disable">
    <description/>
    <target>
       <inSequence>
          <log level="custom">
             <property name="FileProxy" value="Processing file"/>
          </log>
          <property name="OUT_ONLY" value="true"/>
          <property name="ClientApiNonBlocking"
                   value="true"
                  scope="axis2"
                  action="remove"/>
          <send>
             <endpoint name="FileEpr">
                <address uri="vfs:file:////home/shammi/file-out"/>
             </endpoint>
          </send>
       </inSequence>
    </target>
    <parameter name="transport.vfs.Streaming">true </parameter>
    <parameter name="transport.PollInterval">15 </parameter>
    <parameter name="transport.vfs.ActionAfterProcess">MOVE </parameter>
    <parameter name="transport.vfs.FileURI">file:///home/shammi/file-in </parameter>
    <parameter name="transport.vfs.MoveAfterProcess">file:///home/shammi/file-original </parameter>
    <parameter name="transport.vfs.MoveAfterFailure">file:////home/shammi/file-failure </parameter>
    <parameter name="transport.vfs.Locking">enable </parameter>
    <parameter name="transport.vfs.FileNamePattern">.*.zip|.*.test </parameter>
    <parameter name="transport.vfs.ContentType">chs/binary </parameter>
    <parameter name="transport.vfs.ActionAfterFailure">MOVE </parameter>
 </proxy>

    

With this proxy , i could use VFS transport to transfer a file with the size 1 GB with out any issue. I have made the points bold which needs to pay more attention..

Cheers.. Thats it......





[1]http://docs.wso2.org/wiki/pages/viewpage.action?pageId=15471427
[2]http://wso2.org/library/articles/2011/01/wso2-esb-example-file-processing
[3]2012-12-19 11:49:04,797 [-] [Framework Event Dispatcher]  WARN PollTableEntry transport.vfs.FileURI parameter is missing in the proxy service configuration
2012-12-19 11:50:08,896 [-] [vfs-Worker-3] ERROR NativeWorkerPool Uncaught exception
java.lang.OutOfMemoryError: Java heap space
at org.apache.commons.io.output.ByteArrayOutputStream.needNewBuffer(ByteArrayOutputStream.java:124)
at org.apache.commons.io.output.ByteArrayOutputStream.write(ByteArrayOutputStream.java:155)
at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1263)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:1236)
at org.apache.commons.io.IOUtils.toByteArray(IOUtils.java:360)
at org.apache.axis2.format.BinaryBuilder.processDocument(BinaryBuilder.java:72)
at org.apache.synapse.transport.vfs.VFSTransportListener.processFile(VFSTransportListener.java:558)
at org.apache.synapse.transport.vfs.VFSTransportListener.scanFileOrDirectory(VFSTransportListener.java:312)
at org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:158)
at org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:107)
at org.apache.axis2.transport.base.AbstractPollingTransportListener$1$1.run(AbstractPollingTransportListener.java:67)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)


[4]

2012-12-19 13:59:58,033 [-] [Axis2 Task] ERROR VFSTransportSender IO Error while creating response file : file:///home/shammi/wso2/foo/file-out/response.xml
org.apache.axis2.AxisFault: Error serializing binary content of element : {http://ws.apache.org/commons/ns/payload}binary
at org.apache.axis2.format.BinaryFormatter.writeTo(BinaryFormatter.java:66)
at org.apache.synapse.transport.vfs.VFSTransportSender.populateResponseFile(VFSTransportSender.java:235)
at org.apache.synapse.transport.vfs.VFSTransportSender.sendMessage(VFSTransportSender.java:173)
at org.apache.axis2.transport.base.AbstractTransportSender.invoke(AbstractTransportSender.java:112)
at org.apache.axis2.engine.AxisEngine$TransportNonBlockingInvocationWorker.run(AxisEngine.java:627)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.commons.vfs2.FileNotFoundException: Could not read from "file:///home/shammi/wso2/foo/file-in/a.zip" because it is a not a file.
at org.apache.commons.vfs2.provider.AbstractFileObject.getInputStream(AbstractFileObject.java:1316)
at org.apache.commons.vfs2.provider.DefaultFileContent.getInputStream(DefaultFileContent.java:397)
at org.apache.synapse.transport.vfs.FileObjectDataSource.getInputStream(FileObjectDataSource.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.axis2.format.ManagedDataSourceFactory$DataSourceManager.invoke(ManagedDataSourceFactory.java:91)
at $Proxy15.getInputStream(Unknown Source)
at javax.activation.DataHandler.writeTo(DataHandler.java:290)
at org.apache.axis2.format.BinaryFormatter.writeTo(BinaryFormatter.java:64)
... 7 more
Caused by: java.io.FileNotFoundException: /home/shammi/wso2/foo/file-in/a.zip (No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:120)
at org.apache.commons.vfs2.provider.local.LocalFile.doGetInputStream(LocalFile.java:210)
at org.apache.commons.vfs2.provider.AbstractFileObject.getInputStream(AbstractFileObject.java:1308)
... 17 more
2012-12-19 14:00:26,458 [-] [Timer-7]  WARN TimeoutHa