Hi.
I am using the first method as u suggested. But it is giving an error that in
the following line:-
ObjectToNumberConverter converter = dag.addOperator("converter",
ObjectToNumberConverter.class);
It is asking for importing the package of the class but it is in the same
package..
Error while launching the application is defined below:-
An error occurred trying to launch the application. Server message: 2016-08-23
09:32:54.756 INFO net.spy.memcached.auth.AuthThread: Authenticated to
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:54.923 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:54.927 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null,
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:54.928
INFO com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:54.958 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
2016-08-23 09:32:55.504 INFO net.spy.memcached.auth.AuthThread: Authenticated
to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.539 INFO
net.spy.memcached.auth.AuthThread: Authenticated to
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:55.664 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:55.665 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null,
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:55.665
INFO com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools,
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:55.666 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
2016-08-23 09:32:56.245 INFO net.spy.memcached.auth.AuthThread: Authenticated
to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.133 INFO
net.spy.memcached.auth.AuthThread: Authenticated to
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.260 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:57.264 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null,
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.266
INFO com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.285 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
2016-08-23 09:32:57.842 INFO net.spy.memcached.auth.AuthThread: Authenticated
to dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.863 INFO
net.spy.memcached.auth.AuthThread: Authenticated to
dev.nlpcaptcha.in/172.86.180.74:11210 2016-08-23 09:32:57.986 INFO
com.couchbase.client.vbucket.provider.BucketConfigurationProvider: Could
bootstrap through carrier publication. 2016-08-23 09:32:57.987 INFO
com.couchbase.client.CouchbaseConnection: Added {QA
sa=dev.nlpcaptcha.in/172.86.180.74:11210, #Rops=0, #Wops=0, #iq=0, topRop=null,
topWop=null, toWrite=0, interested=0} to connect queue 2016-08-23 09:32:57.987
INFO com.couchbase.client.CouchbaseClient:
CouchbaseConnectionFactory{bucket='beer-sample',
nodes=[http://dev.nlpcaptcha.in:8091/pools,
http://dev.nlpcaptcha.in:8091/pools], order=RANDOM, opTimeout=100000,
opQueue=16384, opQueueBlockTime=1000, obsPollInt=10, obsPollMax=500,
obsTimeout=5000, viewConns=10, viewTimeout=75000, viewWorkers=1,
configCheck=10, reconnectInt=1100, failureMode=Redistribute,
hashAlgo=NATIVE_HASH, authWaitTime=2500} 2016-08-23 09:32:57.988 INFO
com.couchbase.client.CouchbaseClient: viewmode set to production mode
javax.validation.ValidationException: At least one output port must be
connected: median at
com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1764)
at com.datatorrent.stram.StramClient.<init>(StramClient.java:163) at
com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:596)
at com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:2056)
at com.datatorrent.stram.cli.ApexCli.launchAppPackage(ApexCli.java:3445) at
com.datatorrent.stram.cli.ApexCli.access$7400(ApexCli.java:151) at
com.datatorrent.stram.cli.ApexCli$LaunchCommand.execute(ApexCli.java:1900) at
com.datatorrent.stram.cli.ApexCli$3.run(ApexCli.java:1462)
From: Priyanka Gugale [mailto:[email protected]]
Sent: Tuesday, August 23, 2016 11:54 AM
To: [email protected]
Subject: Re: connecting two operators
There are two ways you can do this, first as Yogi suggested add a new operator
to convert object to Number. Below is code example:
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
ObjectToNumberConverter converter = dag.addOperator("converter",
ObjectToNumberConverter.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("inputFormatter", inputOperator.outputPort,
converter.in<http://converter.in>).setLocality(Locality.THREAD_LOCAL);
dag.addStream("med", converter.out, median.data);
//ObjectToNumberConverter Class code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class ObjectToNumberConverter extends BaseOperator
{
private static final Logger LOG =
LoggerFactory.getLogger(ObjectToNumberConverter.class);
public final transient DefaultOutputPort<Number> out = new
DefaultOutputPort<>();
public final transient DefaultInputPort<Object> in = new
DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
if (tuple instanceof Number) {
Number number = (Number)tuple;
out.emit(number);
} else {
LOG.info("Error converting object to number. " + tuple.toString());
}
}
};
}
Second way is you can extend/create new MedianOperator which accepts "Object"
on it's input port and handles type conversion itself.
This is how you can do it:
package com.datatorrent.tutorial.csvparser;
import java.util.ArrayList;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
public class MedianOperator extends BaseOperator
{
private static final Logger LOG =
LoggerFactory.getLogger(MedianOperator.class);
private ArrayList<Double> values;
/**
* Input data port that takes a number.
*/
public final transient DefaultInputPort<Object> data = new
DefaultInputPort<Object>()
{
/**
* Computes sum and count with each tuple
*/
@Override
public void process(Object tuple)
{
if (tuple instanceof Number) {
Number numTuple = (Number)tuple;
values.add(numTuple.doubleValue());
} else {
LOG.info("Invalid input format of tuple: " + tuple.toString());
}
}
};
/**
* Output port that emits median of incoming data.
*/
public final transient DefaultOutputPort<Number> median = new
DefaultOutputPort<Number>();
@Override
public void beginWindow(long arg0)
{
values = new ArrayList<Double>();
}
@Override
public void endWindow()
{
if (values.size() == 0) {
return;
}
if (values.size() == 1) {
median.emit(values.get(0));
return;
}
// median value
Collections.sort(values);
int medianIndex = values.size() / 2;
if (values.size() % 2 == 0) {
Double value = values.get(medianIndex - 1);
value = (value + values.get(medianIndex)) / 2;
median.emit(value);
} else {
median.emit(values.get(medianIndex));
}
}
}
On Mon, Aug 22, 2016 at 5:50 PM, Yogi Devendra
<[email protected]<mailto:[email protected]>> wrote:
Define ObjectToNumberConverter operator which accepts Object on the input port
and emits Number on its output port.
Use this operator in your DAG.
~ Yogi
On 22 August 2016 at 17:42, Hitesh Goyal
<[email protected]<mailto:[email protected]>> wrote:
Hi team,
Refer to following 3 lines of code.
CouchBasePOJOInputOperator inputOperator = dag.addOperator("inputOperator",
CouchBasePOJOInputOperator.class);
MedianOperator median = dag.addOperator("median", MedianOperator.class);
dag.addStream("med", inputOperator.outputPort.getClass(), median.data);
It is giving error in the last line that the method is not applicable as
these arguments.
How should I convert Object to Number so that I can input the data to
Median Operator Class
Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9599803307