I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)
1. dag.addStream("test", rabbitInput.output, out.input); Results in the following error: [ERROR] symbol: variable output [ERROR] location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator 2. dag.addStream("test", rabbitInput.outputPort, out.input); Results inthefollowing error: [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>) [ERROR] method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable [ERROR] (inferred type does not conform to upper bound(s) [ERROR] inferred: byte[] [ERROR] upper bound(s): java.lang.String,java.lang.Object) [ERROR] method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable [ERROR] (inferred type does not conform to upper bound(s) [ERROR] inferred: byte[] [ERROR] upper bound(s): java.lang.String,java.lang.Object) [ERROR] method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable [ERROR] (cannot infer type-variable(s) T [ERROR] (actual and formal argument lists differ in length)) It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example? Cheers Manfred. Am 08.06.2017 um 10:05 schrieb a...@x5h.eu: > > Sorry the two Snippets below where from different iterations. The > Error I get is the following: > > [ERROR] > /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] > cannot find symbol > [ERROR] symbol: variable output > [ERROR] location: variable rabbitInput of type > com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator > > My Code is as follows: > > > package com.example.rabbitMQ; > > import org.apache.hadoop.conf.Configuration; > > import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; > import com.datatorrent.api.annotation.ApplicationAnnotation; > import com.datatorrent.api.StreamingApplication; > import com.datatorrent.api.DAG; > import com.datatorrent.lib.io.jms.JMSStringInputOperator; > > @ApplicationAnnotation(name="RabbitMQ2HDFS") > public class RabbitMQApplication implements StreamingApplication > { > > @Override > public void populateDAG(DAG dag, Configuration conf) > { > > RabbitMQInputOperator rabbitInput = > dag.addOperator("Consumer",RabbitMQInputOperator.class); > rabbitInput.setHost("localhost"); > rabbitInput.setPort(5672); > rabbitInput.setExchange(""); > rabbitInput.setQueueName("hello"); > LineOutputOperator out = dag.addOperator("fileOut", new > LineOutputOperator()); > > dag.addStream("data", rabbitInput.output, out.input); > } > } > > Cheers > > Manfred. > > > > Am 08.06.2017 um 04:34 schrieb vikram patil: >> Hi, >> dag.addStream() is actually used to create stream of from one Operator >> output port to other operators input port. >> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >> RabbitMQInputOperator.class); >> dag.addStream("data", *rabbitInput*.output, out.input); >> Looks like your operator name is incorrect? I see in your code >> snippet above, name of of RabbiMQInputOperator is *"Consumer".* >> >> In property name, you need to provide operator name you have >> specified in addOperator(*"NAME OF THE OPERATOR"*, >> RabbitMQInputOperator.class) api call. >> >> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct >> correct given your operator name is correct ). >> >> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your >> code snippet ). >> >> I think tests which are provided in the Apache Malhar are very >> detailed, they run in local mode as unit tests so we have mocked >> actual rabbitmq by custom message publisher. >> >> For timebeing you set only queuename and hostname as >> >> // set your rabbitmq host . >> consumer.setHost("localhost"); // set your rabbitmq port >> consumer.setPort(5672) // It depends on your rabbitmq producer >> configuration but by default // its default exchange with "" ( No >> Name is provided ). consumer.setExchange(""); // set your queue name >> consumer.setQueueName("YOUR_QUEUE_NAME") >> >> >> >> >> If its okay, could you please share code from your application.java >> and properties.xml here? >> >> Thanks, >> Vikram >> >> >> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu <mailto:a...@x5h.eu>> >> wrote: >> >> Thanks very much for the help. The only problem left is that I >> don't quite understand dag.addstream(). >> >> I tried this >> >> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >> RabbitMQInputOperator.class); >> dag.addStream("data", rabbitInput.output, out.input); >> >> but obviously this doesn't work. What I don't get is the >> difference between the ActiveMQ example and the RabbitMQ example. >> I looked over the test examples for RabbitMQ but don't seem to >> understand the logic behind it. >> >> Is this the correct wax to specify properties: >> <property> >> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> >> <value>500</value> >> </property> >> >> Cheers >> Manfred. >> >> >> Am 07.06.2017 um 12:03 schrieb Vikram Patil: >>> Yes, you would need Application.java which will be way to define a DAG >>> for Apex Application. >>> >>> Please have look at the code from following example to find out how to >>> write JMS ActiveMQ based example: >>> >>> >>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >>> >>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ> >>> >>> This is how you can instantiate RabbitMQINputOperator and to a dag. >>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>> RabbitMQInputOperator.class); >>> >>> >>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >>> >>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag> >>> >>> Following properties need to be specified in properties.xml >>> >>> * Properties:<br> >>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> >>> * <b>bufferSize</b>: Size of holding buffer<br> >>> * <b>host</b>:the address for the consumer to connect to rabbitMQ >>> producer<br> >>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ >>> producer<br> >>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to >>> rabbitMQ producer<br> >>> * <b>routingKey</b>:the routingKey for the consumer to connect to >>> rabbitMQ producer<br> >>> * <b>queueName</b>:the queueName for the consumer to connect to >>> rabbitMQ producer<br> >>> * <br> >>> >>> Reference: >>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >>> >>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java> >>> >>> Thanks, >>> Vikram >>> >>> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>> wrote: >>>> Hello, >>>> >>>> I compiled the whole thing but now I don't know exactly how to get it >>>> running in Apex. Do I need an application.java like in the tutorial? I >>>> do >>>> have a simple RabbitMQ queue up and running on the server. How do I >>>> consume >>>> the messages with Apex and write them to hdfs? >>>> >>>> Cheers, >>>> >>>> Manfred >>>> >>>> Following steps were necessary to get the RabbitMq test to compile >>>> >>>> @TimeoutException >>>> import java.util.concurrent.TimeoutException; >>>> public void setup() throws IOException,TimeoutException >>>> public void teardown() throws IOException,TimeoutException >>>> protected void runTest(final int testNum) throws IOException >>>> >>>> @Build jars >>>> cd apex-malhar/contrib/ >>>> mvn clean package -DskipTests >>>> >>>> cd apex-malhar/library/ >>>> mvn clean package -DskipTests >>>> copy packages to project directory >>>> >>>> @Link them to the project >>>> Add following lines to the pom.xml >>>> <dependency> >>>> <groupId>contrib</groupId> >>>> <artifactId>com.datatorrent.contrib.helper</artifactId> >>>> <version>1.0</version> >>>> <scope>system</scope> >>>> >>>> >>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>> </dependency> >>>> <dependency> >>>> <groupId>lib</groupId> >>>> <artifactId>com.datatorrent.lib.helper</artifactId> >>>> <version>1.0</version> >>>> <scope>system</scope> >>>> >>>> >>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>>> </dependency> >>>> <dependency> >>>> <groupId>contrib</groupId> >>>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId> >>>> <version>1.0</version> >>>> <scope>system</scope> >>>> >>>> >>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>>> </dependency> >>>> <dependency> >>>> <groupId>Attribute</groupId> >>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>>> <version>1.0</version> >>>> <scope>system</scope> >>>> >>>> >>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>>> </dependency> >>>> >>>> >>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>>> >>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are >>>> under the test directory under malhar-contrib and malhar-library >>>> respectively. You may need to build these jars yourself with test >>>> scope to >>>> include these packages. >>>> >>>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <mailto:a...@x5h.eu> >>>> wrote: >>>>> Hello, (mea culpa for messing up the headline the first time) >>>>> >>>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm >>>>> at a >>>>> complete loss, while the examples are running fine I don't even get >>>>> the >>>>> RabbitMQInputOperatorTest.java to run. >>>>> >>>>> First it couldn't find the rabbitmq-client which was solveable by >>>>> adding >>>>> the dependency: >>>>> >>>>> <dependency> >>>>> <groupId>com.rabbitmq</groupId> >>>>> <artifactId>amqp-client</artifactId> >>>>> <version>4.1.0</version> >>>>> </dependency> >>>>> >>>>> But now it doesn't find the packages com.datatorrent.contrib.helper >>>>> and >>>>> com.datatorrent.lib.helper and can't find several symbols. >>>>> >>>>> Needless to say that I'm a beginner regarding Apex so does anyone know >>>>> what exactly I'm doing wrong here? >>>>> >>>>> Cheers >>>>> >>>>> Manfred. >>>>> >>>>> >> >> >