Hi,
I have used the operators in the way u suggested and the application got
successfully launched.
CouchBasePOJOInputOperator inputOperator =
dag.addOperator("inputOperator", CouchBasePOJOInputOperator.class);
ConsoleOutputOperator cons = dag.addOperator("console", new
ConsoleOutputOperator());
ObjectToNumberConverter converter=dag.addOperator("converter",
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median",
MedianOperator.class);
dag.addStream("inputFormatter", inputOperator.outputPort,
converter.in);
dag.addStream("med", converter.out, median.data);
dag.addStream("out", median.median, cons.input);
inputOperator is emitting tuples. These tuples gets processed in converter
operator but ObjectToNumberConverter is not emitting anything at its
outputport(i.e. at converter.out) . What can be the reasons as there are no
error showing in the code.
The CouchBasePOJOInputOperator in emitting the following output.
{
"string": "{\"abv\":\"0.0\"}"
}
From: Priyanka Gugale [mailto:[email protected]]
Sent: Tuesday, August 23, 2016 3:01 PM
To: [email protected]
Subject: Re: connecting two operators
Okay, as I understand, as of now your further processing piece is missing. You
can use ConsoleOutputOperator for time being. Once you have your processing
operators ready you can remove the ConsoleOutputOperator and connect output of
Median operator to your processing operators.
-Priyanka
On Tue, Aug 23, 2016 at 2:58 PM, Hitesh Goyal
<[email protected]<mailto:[email protected]>> wrote:
I want to access data from Couchbase using CouchBasePOJOInputOperator and pass
it to median operator for calculating the median of that data. And then I want
that median value as a variable or so for further processing.
From: Priyanka Gugale
[mailto:[email protected]<mailto:[email protected]>]
Sent: Tuesday, August 23, 2016 2:49 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: connecting two operators
Hi Hitesh,
"Median" is not an output operator, we should connect at least one of the
output port of non i/o operators to next operator. In this example you can
either use ConsoleOutputOperator to log median output to Console.
Can you share what you want to achieve using this app? The output of median
will be emitted on the "median" port, if you don't connect that port, the
calculated value won't be used anywhere.
-Priyanka
On Tue, Aug 23, 2016 at 2:28 PM, Hitesh Goyal
<[email protected]<mailto:[email protected]>> wrote:
Hi.
I am using the first method as u suggested. But it is giving an error that in
the following line:-
ObjectToNumberConverter converter = dag.addOperator("converter",
ObjectToNumberConverter.class);
It is asking for importing the package of the class but it is in the same
package..
Error while launching the application is defined below:-
An error occurred trying to launch the application. Server message: 2016-08-23
09:32:54.756 INFO net.spy.memcached.auth.AuthThread: Authenticated to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:54.923 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
#Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to
connect queue 2016-08-23 09:32:54.928 INFO
com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: Authenticated
to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:55.539 INFO net.spy.memcached.auth.AuthThread: Authenticated
to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:55.664 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
#Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to
connect queue 2016-08-23 09:32:55.665 INFO
com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools,
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:55.666 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
2016-08-23 09:32:56.245 INFO net.spy.memcached.auth.AuthThread: Authenticated
to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:57.133 INFO net.spy.memcached.auth.AuthThread: Authenticated
to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:57.260 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
#Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to
connect queue 2016-08-23 09:32:57.266 INFO
com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: Authenticated
to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:57.863 INFO net.spy.memcached.auth.AuthThread: Authenticated
to
dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>
2016-08-23 09:32:57.986 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210<http://dev.nlpcaptcha.in/172.86.180.74:11210>,
#Rops=0, #Wops=0, #iq=0, topRop=null, topWop=null, toWrite=0, interested=0} to
connect queue 2016-08-23 09:32:57.987 INFO
com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools,
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.988 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
javax.validation.ValidationException: At least one output port must be
connected: median at
com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764)
at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at
com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596)
at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056)
at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) at
com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) at
com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462)
From: Priyanka Gugale
[mailto:[email protected]<mailto:[email protected]>]
Sent: Tuesday, August 23, 2016 11:54 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: connecting two operators
There are two ways you can do this, first as Yogi suggested add a new operator
to convert object to Number. Below is code example:
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
ObjectToNumberConverter converter = dag.addOperator("converter",
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("inputFormatter", inputOperator.outputPort,
converter.in<http://converter.in>).setLocality(Locality.THREAD_LOCAL);
dag.addStream("med", converter.out, median.data);
//ObjectToNumberConverter Class code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class ObjectToNumberConverter extends BaseOperator
{
private static final Logger LOG =
LoggerFactory.getLogger(ObjectToNumberConverter.class);
public final transient DefaultOutputPort<Number> out = new
DefaultOutputPort<>();
public final transient DefaultInputPort<Object> in = new
DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
if (tuple instanceof Number) {
Number number = (Number)tuple;
out.emit(number);
} else {
LOG.info("Error converting object to number. " + tuple.toString());
}
}
};
}
Second way is you can extend/create new MedianOperator which accepts "Object"
on it's input port and handles type conversion itself.
This is how you can do it:
package com.datatorrent.tutorial.csvparser;
import java.util.ArrayList;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class MedianOperator extends BaseOperator
{
private static final Logger LOG =
LoggerFactory.getLogger(MedianOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> data = new
DefaultInputPort<Object>()
{
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple)
{
if (tuple instanceof Number) {
Number numTuple = (Number)tuple;
values.add(numTuple.doubleValue());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> median = new
DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0)
{
values = new ArrayList<Double>();
}
@Override
public void endWindow()
{
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
median.emit(values.get(0));
return;
}
// median value
Collections.sort(values);
int medianIndex = values.size() / 2;
if (values.size() % 2 == 0) {
Double value = values.get(medianIndex - 1);
value = (value + values.get(medianIndex)) / 2;
median.emit(value);
} else {
median.emit(values.get(medianIndex));
}
}
}
On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra
<[email protected]<mailto:[email protected]>> wrote:
Define ObjectToNumberConverter operator which accepts Object on the input port
and emits Number on its output port.
Use this operator in your DAG.
~ Yogi
On 22 August 2016 at 17:42, Hitesh Goyal
<[email protected]<mailto:[email protected]>> wrote:
Hi team,
Refer to following 3 lines of code.
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("med", inputOperator.outputPort.getClass(), median.data);
It is giving error in the last line that the method is not applicable as
these arguments.
How should I convert Object to Number so that I can input the data to
Median Operator Class
Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307