Monday, September 9, 2013

Publish data to cassandra from jaggery.js using WSO2 BAM DataPublisher

To achieve the subject, I used kpi-definition sample app[1] with WSO2 BAM 2.3.0.

First I managed to install few BAM features within WSO2 enterprise-store[2].
You can get those features with the enterprise-store social branch, or install by yourself using following.

Update <enterprise_store_home>/modules/p2-profile-gen/pom.xml with BAM-cassandra feature.

Define following <featureArtifactDef> under <featureArtifacts> ,

org.wso2.carbon:org.wso2.carbon.cassandra.explorer.feature                                    org.wso2.carbon:org.wso2.carbon.cassandra.feature
org.wso2.carbon:org.wso2.carbon.bam.cassandra.data.archive.feature
org.wso2.carbon:org.wso2.carbon.databridge.datapublisher.feature
org.wso2.carbon:org.wso2.carbon.databridge.datareceiver.feature                                    org.wso2.carbon:org.wso2.carbon.databridge.cassandra.feature
org.wso2.carbon:org.wso2.carbon.module.mgt.server.feature                                    org.wso2.carbon:org.wso2.carbon.databridge.streamdefn.cassandra.feature

Define following features under <features> ,

org.wso2.carbon.cassandra.explorer.feature.group
org.wso2.carbon.bam.cassandra.data.archive.feature.group
org.wso2.carbon.cassandra.feature.group
org.wso2.carbon.databridge.datapublisher.feature.group
org.wso2.carbon.databridge.datareceiver.feature.group
org.wso2.carbon.databridge.cassandra.feature.group

Run mvn clean install from within your <enterprise_store_home>.
If have defined above features with correct versions you will get the BUILD SUCCESS with all newly added features.

Lets start the server,
cd modules/distribution/target/
unzip wso2store-1.0.0.zip
sh wso2store-1.0.0/bin/wso2server.sh

Keep in mind if you do have your own Cassandra server up and running make sure you stop that before above step.

Next lets write the same above kpi-definition Java sample in JavaScript, and save it as Stream.js

var host = "localhost";
var url = "tcp://" + host + ":" + "7611";
var username = "admin";
var password = "admin";

var PHONE_RETAIL_STREAM = "org.wso2.bam.phone.retail.store.kpi";
var VERSION = "1.0.0";

var Agent = org.wso2.carbon.databridge.agent.thrift.Agent;
var AgentConfiguration = org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
var DataPublisher = org.wso2.carbon.databridge.agent.thrift.DataPublisher;
var Event = org.wso2.carbon.databridge.commons.Event;

agentConfiguration = new AgentConfiguration();
agent = new Agent(agentConfiguration);

dataPublisher = new DataPublisher(url, username, password, agent);
var streamId = null;
  try {
     streamId = dataPublisher.findStream(PHONE_RETAIL_STREAM, VERSION);

      } catch (e) {
            streamId = dataPublisher.defineStream("{" +
                                                  "  'name':'" + PHONE_RETAIL_STREAM + "'," +
                                                  "  'version':'" + VERSION + "'," +
                                                  "  'nickName': 'Phone_Retail_Shop'," +
                                                  "  'description': 'Phone Sales'," +
                                                  "  'metaData':[" +
                                                  "          {'name':'clientType','type':'STRING'}" +
                                                  "  ]," +
                                                  "  'payloadData':[" +
                                                  "          {'name':'brand','type':'STRING'}," +
                                                  "          {'name':'quantity','type':'INT'}," +
                                                  "          {'name':'total','type':'INT'}," +
                                                  "          {'name':'user','type':'STRING'}" +
                                                  "  ]" +
                                                  "}");
        }

           
        if(streamId){
            for (var i = 0; i < 100; i++) {
                publishEvents(dataPublisher, streamId, i);
                print("Events published : " + (i + 1)+"<br/>");
            }
            dataPublisher.stop();
        }

    function  publishEvents(dataPublisher, streamId, i){
       var quantity = 1000;
       var ext_str = new java.lang.String("external");
       var nokia_str = new java.lang.String("Nokia");
       var total_int = new java.lang.Integer(123);
       var quantity_int = new java.lang.Integer(12300);
       var user_str = new java.lang.String("UdaraR");
       eventOne = new Event(streamId, Date.now(),[ext_str], null,[nokia_str,total_int, quantity_int, user_str]);
       dataPublisher.publish(eventOne);
    }

Now you can use this within the jaggery script,

var Stream = require("Stream.js");
 

[1] http://docs.wso2.org/display/BAM230/KPI+Monitoring+Sample
[2] https://github.com/wso2/enterprise-store

No comments:

Post a Comment