Hi, dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class); dag.addStream("data", *rabbitInput*.output, out.input); Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is *"Consumer".* In property name, you need to provide operator name you have specified in addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) api call. dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your code snippet ). I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. For timebeing you set only queuename and hostname as // set your rabbitmq host . consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME") If its okay, could you please share code from your application.java and properties.xml here? Thanks, Vikram On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu> wrote: > Thanks very much for the help. The only problem left is that I don't quite > understand dag.addstream(). > > I tried this > > RabbitMQInputOperator consumer = dag.addOperator("Consumer", > RabbitMQInputOperator.class); > dag.addStream("data", rabbitInput.output, out.input); > > > but obviously this doesn't work. What I don't get is the difference > between the ActiveMQ example and the RabbitMQ example. I looked over the > test examples for RabbitMQ but don't seem to understand the logic behind > it. > > Is this the correct wax to specify properties: > <property> > <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> > <value>500</value> > </property> > > Cheers > Manfred. > > > Am 07.06.2017 um 12:03 schrieb Vikram Patil: > > Yes, you would need Application.java which will be way to define a DAG > for Apex Application. > > Please have look at the code from following example to find out how to > write JMS ActiveMQ based example: > https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ > > This is how you can instantiate RabbitMQINputOperator and to a dag. > RabbitMQInputOperator consumer = dag.addOperator("Consumer", > RabbitMQInputOperator.class); > https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag > > Following properties need to be specified in properties.xml > > * Properties:<br> > * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> > * <b>bufferSize</b>: Size of holding buffer<br> > * <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br> > * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ > producer<br> > * <b>exchangeType</b>:the exchangeType for the consumer to connect to > rabbitMQ producer<br> > * <b>routingKey</b>:the routingKey for the consumer to connect to > rabbitMQ producer<br> > * <b>queueName</b>:the queueName for the consumer to connect to > rabbitMQ producer<br> > * <br> > > Reference: > https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java > > Thanks, > Vikram > > On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> <a...@x5h.eu> wrote: > > Hello, > > I compiled the whole thing but now I don't know exactly how to get it > running in Apex. Do I need an application.java like in the tutorial? I do > have a simple RabbitMQ queue up and running on the server. How do I consume > the messages with Apex and write them to hdfs? > > Cheers, > > Manfred > > Following steps were necessary to get the RabbitMq test to compile > > @TimeoutException > import java.util.concurrent.TimeoutException; > public void setup() throws IOException,TimeoutException > public void teardown() throws IOException,TimeoutException > protected void runTest(final int testNum) throws IOException > > @Build jars > cd apex-malhar/contrib/ > mvn clean package -DskipTests > > cd apex-malhar/library/ > mvn clean package -DskipTests > copy packages to project directory > > @Link them to the project > Add following lines to the pom.xml > <dependency> > <groupId>contrib</groupId> > <artifactId>com.datatorrent.contrib.helper</artifactId> > <version>1.0</version> > <scope>system</scope> > > <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> > </dependency> > <dependency> > <groupId>lib</groupId> > <artifactId>com.datatorrent.lib.helper</artifactId> > <version>1.0</version> > <scope>system</scope> > > <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> > </dependency> > <dependency> > <groupId>contrib</groupId> > <artifactId>com.datatorrent.contrib.rabbitmq</artifactId> > <version>1.0</version> > <scope>system</scope> > > <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> > </dependency> > <dependency> > <groupId>Attribute</groupId> > <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> > <version>1.0</version> > <scope>system</scope> > > <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> > </dependency> > > > Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: > > Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are > under the test directory under malhar-contrib and malhar-library > respectively. You may need to build these jars yourself with test scope to > include these packages. > > On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <a...@x5h.eu> wrote: > > Hello, (mea culpa for messing up the headline the first time) > > I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a > complete loss, while the examples are running fine I don't even get the > RabbitMQInputOperatorTest.java to run. > > First it couldn't find the rabbitmq-client which was solveable by adding > the dependency: > > <dependency> > <groupId>com.rabbitmq</groupId> > <artifactId>amqp-client</artifactId> > <version>4.1.0</version> > </dependency> > > But now it doesn't find the packages com.datatorrent.contrib.helper and > com.datatorrent.lib.helper and can't find several symbols. > > Needless to say that I'm a beginner regarding Apex so does anyone know > what exactly I'm doing wrong here? > > Cheers > > Manfred. > > > > >