Hello,
@2.) Does it have to be a durable queue or does an exchange suffice?
Cheers
Manfred
Am 09.06.2017 um 15:23 schrieb vikram patil:
> 1) Are you doing it on your local environment?
> 2) If you are doing it locally I would suggest following options
> 1) If you dont want to create queue on rabbitmq by yourself . Set
> queuename on operator
> in.setQueueName("YOUR_QUEUE_NAME" )
> Operator will do following steps :
> * Create Durable Queue in RabbitMQ
> * You have specfied exchange and exchangeType .
> So it will create an exchange using this information
> and bind created queue with exchange with default routing key which
> will be "".
> Right now it must be creating auto generated unique named
> queue for you.
>
> 2) You can create your own exchange and durable queue using
> rabbitmq admin . You will have to install rabbitmq plugins for that.
> You can use it to publish some test data as well.
>
> Using apex-cli you can check status of your application, if its
> failing then you should check logs from userlogs in hadoop logs directory.
>
> Thanks & Regards,
> Vikram
>
>
>
>
> On Fri, Jun 9, 2017 at 6:42 PM, <[email protected] <mailto:[email protected]>> wrote:
>
> Finally got rid of all errors but now I have the problem that the
> apex application does not seem to register at the RabbitMQ exchange.
>
> This is my code:
>
> @ApplicationAnnotation(name="RabbitMQ2HDFS")
> public class RabbitMQApplication implements StreamingApplication
> {
>
> @Override
> public void populateDAG(DAG dag, Configuration conf)
> {
> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
> RabbitMQInputOperator());
> in.setHost("localhost");
> //in.setPort(5672);
> in.setExchange("apex");
> in.setExchangeType("fanout");
> ConsoleOutputOperator console = dag.addOperator("console", new
> ConsoleOutputOperator());
> dag.addStream("rand_console",in.outputPort, console.input);
>
> }
>
> }
>
> If I launch the application everthing executes without an error
> but if i list the bindings on the exchange, there is none.
>
> Anyone even an idea how i can start to debug this?
>
> Cheers
> Manfred.
>
>
>
> Am 08.06.2017 um 18:04 schrieb [email protected] <mailto:[email protected]>:
>>
>> Okay i found the error, I copied the LineOutputOperator.java
>>
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
>> from the jmsActiveMQ example and found there
>> public class LineOutputOperator extends
>> AbstractFileOutputOperator<String>
>>
>> Instead i took the LineOutputOperator.java
>>
>> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
>> the Kafka 0.9 example there the class is correctly defined for
>> the RabbitMQInputOperator
>>
>> So far so good now it compiles without errors.
>>
>> Cheers
>>
>> Manfred
>>
>> Am 08.06.2017 um 17:38 schrieb [email protected] <mailto:[email protected]>:
>>>
>>> I still don't get it completely: (The rest of the code is in the
>>> Email before, this is only the necessary sample)
>>>
>>> 1. dag.addStream("test", rabbitInput.output, out.input);
>>> Results in the following error:
>>> [ERROR] symbol: variable output
>>> [ERROR] location: variable rabbitInput of type
>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>
>>> 2. dag.addStream("test", rabbitInput.outputPort, out.input);
>>> Results in the following error:
>>> [ERROR]
>>>
>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>>> no suitable method found for
>>>
>>> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>>> [ERROR] method
>>>
>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>> T>...) is not applicable
>>> [ERROR] (inferred type does not conform to upper bound(s)
>>> [ERROR] inferred: byte[]
>>> [ERROR] upper bound(s):
>>> java.lang.String,java.lang.Object)
>>> [ERROR] method
>>>
>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>> T>) is not applicable
>>> [ERROR] (inferred type does not conform to upper bound(s)
>>> [ERROR] inferred: byte[]
>>> [ERROR] upper bound(s):
>>> java.lang.String,java.lang.Object)
>>> [ERROR] method
>>>
>>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>>> applicable
>>> [ERROR] (cannot infer type-variable(s) T
>>> [ERROR] (actual and formal argument lists differ in
>>> length))
>>>
>>>
>>>
>>> It seems that on the one hand the RabbitMQInputOperator.class
>>> does not have an output method and on the other hand the
>>> addStream method only accepts outputPort combined with inputPort
>>> methods or output and input methods of the corresponding
>>> classes. Does that mean I only can use a class that implements
>>> inputPort method for this example?
>>>
>>> Cheers
>>>
>>> Manfred.
>>>
>>>
>>>
>>> Am 08.06.2017 um 10:05 schrieb [email protected] <mailto:[email protected]>:
>>>>
>>>> Sorry the two Snippets below where from different iterations.
>>>> The Error I get is the following:
>>>>
>>>> [ERROR]
>>>>
>>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>>> cannot find symbol
>>>> [ERROR] symbol: variable output
>>>> [ERROR] location: variable rabbitInput of type
>>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>>
>>>> My Code is as follows:
>>>>
>>>>
>>>> package com.example.rabbitMQ;
>>>>
>>>> import org.apache.hadoop.conf.Configuration;
>>>>
>>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>>>> import com.datatorrent.api.StreamingApplication;
>>>> import com.datatorrent.api.DAG;
>>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>>
>>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>>> public class RabbitMQApplication implements StreamingApplication
>>>> {
>>>>
>>>> @Override
>>>> public void populateDAG(DAG dag, Configuration conf)
>>>> {
>>>>
>>>> RabbitMQInputOperator rabbitInput =
>>>> dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>>> rabbitInput.setHost("localhost");
>>>> rabbitInput.setPort(5672);
>>>> rabbitInput.setExchange("");
>>>> rabbitInput.setQueueName("hello");
>>>> LineOutputOperator out = dag.addOperator("fileOut", new
>>>> LineOutputOperator());
>>>>
>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>> }
>>>> }
>>>>
>>>> Cheers
>>>>
>>>> Manfred.
>>>>
>>>>
>>>>
>>>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>>> Hi,
>>>>> dag.addStream() is actually used to create stream of from one
>>>>> Operator output port to other operators input port.
>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>> RabbitMQInputOperator.class);
>>>>> dag.addStream("data", *rabbitInput*.output, out.input);
>>>>> Looks like your operator name is incorrect? I see in your code
>>>>> snippet above, name of of RabbiMQInputOperator is *"Consumer".*
>>>>>
>>>>> In property name, you need to provide operator name you have
>>>>> specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>>> RabbitMQInputOperator.class) api call.
>>>>>
>>>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is
>>>>> correct correct given your operator name is correct ).
>>>>>
>>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based
>>>>> on your code snippet ).
>>>>>
>>>>> I think tests which are provided in the Apache Malhar are very
>>>>> detailed, they run in local mode as unit tests so we have
>>>>> mocked actual rabbitmq by custom message publisher.
>>>>>
>>>>> For timebeing you set only queuename and hostname as
>>>>>
>>>>> // set your rabbitmq host .
>>>>> consumer.setHost("localhost"); // set your rabbitmq port
>>>>> consumer.setPort(5672) // It depends on your rabbitmq producer
>>>>> configuration but by default // its default exchange with "" (
>>>>> No Name is provided ). consumer.setExchange(""); // set your
>>>>> queue name consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> If its okay, could you please share code from your
>>>>> application.java and properties.xml here?
>>>>>
>>>>> Thanks,
>>>>> Vikram
>>>>>
>>>>>
>>>>> On Thu, Jun 8, 2017 at 12:32 AM, <[email protected]
>>>>> <mailto:[email protected]>> wrote:
>>>>>
>>>>> Thanks very much for the help. The only problem left is
>>>>> that I don't quite understand dag.addstream().
>>>>>
>>>>> I tried this
>>>>>
>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>> RabbitMQInputOperator.class);
>>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>>>
>>>>> but obviously this doesn't work. What I don't get is the
>>>>> difference between the ActiveMQ example and the RabbitMQ
>>>>> example. I looked over the test examples for RabbitMQ but
>>>>> don't seem to understand the logic behind it.
>>>>>
>>>>> Is this the correct wax to specify properties:
>>>>> <property>
>>>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>>> <value>500</value>
>>>>> </property>
>>>>>
>>>>> Cheers
>>>>> Manfred.
>>>>>
>>>>>
>>>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>>> Yes, you would need Application.java which will be way to define
>>>>>> a DAG
>>>>>> for Apex Application.
>>>>>>
>>>>>> Please have look at the code from following example to find out
>>>>>> how to
>>>>>> write JMS ActiveMQ based example:
>>>>>>
>>>>>>
>>>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>>
>>>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>>
>>>>>> This is how you can instantiate RabbitMQINputOperator and to a
>>>>>> dag.
>>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>>> RabbitMQInputOperator.class);
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>>
>>>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>>
>>>>>> Following properties need to be specified in properties.xml
>>>>>>
>>>>>> * Properties:<br>
>>>>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>>>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>>>>> * <b>host</b>:the address for the consumer to connect to
>>>>>> rabbitMQ producer<br>
>>>>>> * <b>exchange</b>:the exchange for the consumer to connect to
>>>>>> rabbitMQ
>>>>>> producer<br>
>>>>>> * <b>exchangeType</b>:the exchangeType for the consumer to
>>>>>> connect to
>>>>>> rabbitMQ producer<br>
>>>>>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>>>>>> rabbitMQ producer<br>
>>>>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>>>>> rabbitMQ producer<br>
>>>>>> * <br>
>>>>>>
>>>>>> Reference:
>>>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>>
>>>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>>
>>>>>> Thanks,
>>>>>> Vikram
>>>>>>
>>>>>> On Wed, Jun 7, 2017 at 3:19 PM, <[email protected]>
>>>>>> <mailto:[email protected]> wrote:
>>>>>>> Hello,
>>>>>>>
>>>>>>> I compiled the whole thing but now I don't know exactly how to
>>>>>>> get it
>>>>>>> running in Apex. Do I need an application.java like in the
>>>>>>> tutorial? I do
>>>>>>> have a simple RabbitMQ queue up and running on the server. How
>>>>>>> do I consume
>>>>>>> the messages with Apex and write them to hdfs?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Manfred
>>>>>>>
>>>>>>> Following steps were necessary to get the RabbitMq test to
>>>>>>> compile
>>>>>>>
>>>>>>> @TimeoutException
>>>>>>> import java.util.concurrent.TimeoutException;
>>>>>>> public void setup() throws IOException,TimeoutException
>>>>>>> public void teardown() throws IOException,TimeoutException
>>>>>>> protected void runTest(final int testNum) throws IOException
>>>>>>>
>>>>>>> @Build jars
>>>>>>> cd apex-malhar/contrib/
>>>>>>> mvn clean package -DskipTests
>>>>>>>
>>>>>>> cd apex-malhar/library/
>>>>>>> mvn clean package -DskipTests
>>>>>>> copy packages to project directory
>>>>>>>
>>>>>>> @Link them to the project
>>>>>>> Add following lines to the pom.xml
>>>>>>> <dependency>
>>>>>>> <groupId>contrib</groupId>
>>>>>>> <artifactId>com.datatorrent.co
>>>>>>> <http://com.datatorrent.co>ntrib.helper</artifactId>
>>>>>>> <version>1.0</version>
>>>>>>> <scope>system</scope>
>>>>>>>
>>>>>>>
>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>lib</groupId>
>>>>>>> <artifactId>com.datatorrent.li
>>>>>>> <http://com.datatorrent.li>b.helper</artifactId>
>>>>>>> <version>1.0</version>
>>>>>>> <scope>system</scope>
>>>>>>>
>>>>>>>
>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>contrib</groupId>
>>>>>>> <artifactId>com.datatorrent.co
>>>>>>> <http://com.datatorrent.co>ntrib.rabbitmq</artifactId>
>>>>>>> <version>1.0</version>
>>>>>>> <scope>system</scope>
>>>>>>>
>>>>>>>
>>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>>> </dependency>
>>>>>>> <dependency>
>>>>>>> <groupId>Attribute</groupId>
>>>>>>>
>>>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>>> <version>1.0</version>
>>>>>>> <scope>system</scope>
>>>>>>>
>>>>>>>
>>>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>>> </dependency>
>>>>>>>
>>>>>>>
>>>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>>
>>>>>>> Both com.datatorrent.contrib.helper and
>>>>>>> com.datatorrent.lib.helper are
>>>>>>> under the test directory under malhar-contrib and malhar-library
>>>>>>> respectively. You may need to build these jars yourself with
>>>>>>> test scope to
>>>>>>> include these packages.
>>>>>>>
>>>>>>> On Wed, May 31, 2017 at 9:39 AM, <[email protected]>
>>>>>>> <mailto:[email protected]> wrote:
>>>>>>>> Hello, (mea culpa for messing up the headline the first time)
>>>>>>>>
>>>>>>>> I'm currently trying to get the apex-malhar rabbitmq running.
>>>>>>>> But I'm at a
>>>>>>>> complete loss, while the examples are running fine I don't
>>>>>>>> even get the
>>>>>>>> RabbitMQInputOperatorTest.java to run.
>>>>>>>>
>>>>>>>> First it couldn't find the rabbitmq-client which was solveable
>>>>>>>> by adding
>>>>>>>> the dependency:
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>> <groupId>com.rabbitmq</groupId>
>>>>>>>> <artifactId>amqp-client</artifactId>
>>>>>>>> <version>4.1.0</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>> But now it doesn't find the packages
>>>>>>>> com.datatorrent.contrib.helper and
>>>>>>>> com.datatorrent.lib.helper and can't find several symbols.
>>>>>>>>
>>>>>>>> Needless to say that I'm a beginner regarding Apex so does
>>>>>>>> anyone know
>>>>>>>> what exactly I'm doing wrong here?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> Manfred.
>>>>>>>>
>>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>