Finally got rid of all errors but now I have the problem that the apex
application does not seem to register at the RabbitMQ exchange.
This is my code:
@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
in.setHost("localhost");
//in.setPort(5672);
in.setExchange("apex");
in.setExchangeType("fanout");
ConsoleOutputOperator console = dag.addOperator("console", new
ConsoleOutputOperator());
dag.addStream("rand_console",in.outputPort, console.input);
}
}
If I launch the application everthing executes without an error but if i
list the bindings on the exchange, there is none.
Anyone even an idea how i can start to debug this?
Cheers
Manfred.
Am 08.06.2017 um 18:04 schrieb [email protected]:
>
> Okay i found the error, I copied the LineOutputOperator.java
> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java>
> from the jmsActiveMQ example and found there
> public class LineOutputOperator extends AbstractFileOutputOperator<String>
>
> Instead i took the LineOutputOperator.java
> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from
> the Kafka 0.9 example there the class is correctly defined for the
> RabbitMQInputOperator
>
> So far so good now it compiles without errors.
>
> Cheers
>
> Manfred
>
> Am 08.06.2017 um 17:38 schrieb [email protected]:
>>
>> I still don't get it completely: (The rest of the code is in the
>> Email before, this is only the necessary sample)
>>
>> 1. dag.addStream("test", rabbitInput.output, out.input);
>> Results in the following error:
>> [ERROR] symbol: variable output
>> [ERROR] location: variable rabbitInput of type
>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>
>> 2. dag.addStream("test", rabbitInput.outputPort, out.input);
>> Results in the following error:
>> [ERROR]
>>
>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8]
>> no suitable method found for
>>
>> addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
>> [ERROR] method
>>
>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>> extends T>,com.datatorrent.api.Operator.InputPort<? super T>...)
>> is not applicable
>> [ERROR] (inferred type does not conform to upper bound(s)
>> [ERROR] inferred: byte[]
>> [ERROR] upper bound(s): java.lang.String,java.lang.Object)
>> [ERROR] method
>>
>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>> extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is
>> not applicable
>> [ERROR] (inferred type does not conform to upper bound(s)
>> [ERROR] inferred: byte[]
>> [ERROR] upper bound(s): java.lang.String,java.lang.Object)
>> [ERROR] method
>>
>> com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<?
>> extends T>,com.datatorrent.api.Operator.InputPort<? super
>> T>,com.datatorrent.api.Operator.InputPort<? super T>) is not
>> applicable
>> [ERROR] (cannot infer type-variable(s) T
>> [ERROR] (actual and formal argument lists differ in length))
>>
>>
>>
>> It seems that on the one hand the RabbitMQInputOperator.class does
>> not have an output method and on the other hand the addStream method
>> only accepts outputPort combined with inputPort methods or output and
>> input methods of the corresponding classes. Does that mean I only can
>> use a class that implements inputPort method for this example?
>>
>> Cheers
>>
>> Manfred.
>>
>>
>>
>> Am 08.06.2017 um 10:05 schrieb [email protected]:
>>>
>>> Sorry the two Snippets below where from different iterations. The
>>> Error I get is the following:
>>>
>>> [ERROR]
>>> /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
>>> cannot find symbol
>>> [ERROR] symbol: variable output
>>> [ERROR] location: variable rabbitInput of type
>>> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
>>>
>>> My Code is as follows:
>>>
>>>
>>> package com.example.rabbitMQ;
>>>
>>> import org.apache.hadoop.conf.Configuration;
>>>
>>> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
>>> import com.datatorrent.api.annotation.ApplicationAnnotation;
>>> import com.datatorrent.api.StreamingApplication;
>>> import com.datatorrent.api.DAG;
>>> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
>>>
>>> @ApplicationAnnotation(name="RabbitMQ2HDFS")
>>> public class RabbitMQApplication implements StreamingApplication
>>> {
>>>
>>> @Override
>>> public void populateDAG(DAG dag, Configuration conf)
>>> {
>>>
>>> RabbitMQInputOperator rabbitInput =
>>> dag.addOperator("Consumer",RabbitMQInputOperator.class);
>>> rabbitInput.setHost("localhost");
>>> rabbitInput.setPort(5672);
>>> rabbitInput.setExchange("");
>>> rabbitInput.setQueueName("hello");
>>> LineOutputOperator out = dag.addOperator("fileOut", new
>>> LineOutputOperator());
>>>
>>> dag.addStream("data", rabbitInput.output, out.input);
>>> }
>>> }
>>>
>>> Cheers
>>>
>>> Manfred.
>>>
>>>
>>>
>>> Am 08.06.2017 um 04:34 schrieb vikram patil:
>>>> Hi,
>>>> dag.addStream() is actually used to create stream of from one Operator
>>>> output port to other operators input port.
>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>> RabbitMQInputOperator.class);
>>>> dag.addStream("data", *rabbitInput*.output, out.input);
>>>> Looks like your operator name is incorrect? I see in your code
>>>> snippet above, name of of RabbiMQInputOperator is *"Consumer".*
>>>>
>>>> In property name, you need to provide operator name you have
>>>> specified in addOperator(*"NAME OF THE OPERATOR"*,
>>>> RabbitMQInputOperator.class) api call.
>>>>
>>>> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
>>>> correct given your operator name is correct ).
>>>>
>>>> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on
>>>> your code snippet ).
>>>>
>>>> I think tests which are provided in the Apache Malhar are very
>>>> detailed, they run in local mode as unit tests so we have mocked
>>>> actual rabbitmq by custom message publisher.
>>>>
>>>> For timebeing you set only queuename and hostname as
>>>>
>>>> // set your rabbitmq host .
>>>> consumer.setHost("localhost"); // set your rabbitmq port
>>>> consumer.setPort(5672) // It depends on your rabbitmq producer
>>>> configuration but by default // its default exchange with "" ( No
>>>> Name is provided ). consumer.setExchange(""); // set your queue
>>>> name consumer.setQueueName("YOUR_QUEUE_NAME")
>>>>
>>>>
>>>>
>>>>
>>>> If its okay, could you please share code from your application.java
>>>> and properties.xml here?
>>>>
>>>> Thanks,
>>>> Vikram
>>>>
>>>>
>>>> On Thu, Jun 8, 2017 at 12:32 AM, <[email protected] <mailto:[email protected]>>
>>>> wrote:
>>>>
>>>> Thanks very much for the help. The only problem left is that I
>>>> don't quite understand dag.addstream().
>>>>
>>>> I tried this
>>>>
>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>> RabbitMQInputOperator.class);
>>>> dag.addStream("data", rabbitInput.output, out.input);
>>>>
>>>> but obviously this doesn't work. What I don't get is the
>>>> difference between the ActiveMQ example and the RabbitMQ
>>>> example. I looked over the test examples for RabbitMQ but don't
>>>> seem to understand the logic behind it.
>>>>
>>>> Is this the correct wax to specify properties:
>>>> <property>
>>>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
>>>> <value>500</value>
>>>> </property>
>>>>
>>>> Cheers
>>>> Manfred.
>>>>
>>>>
>>>> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>>>>> Yes, you would need Application.java which will be way to define a DAG
>>>>> for Apex Application.
>>>>>
>>>>> Please have look at the code from following example to find out how to
>>>>> write JMS ActiveMQ based example:
>>>>>
>>>>>
>>>>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>>>>
>>>>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>>>>
>>>>> This is how you can instantiate RabbitMQINputOperator and to a dag.
>>>>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>>>>> RabbitMQInputOperator.class);
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>>>>
>>>>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>>>>
>>>>> Following properties need to be specified in properties.xml
>>>>>
>>>>> * Properties:<br>
>>>>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>>>>> * <b>bufferSize</b>: Size of holding buffer<br>
>>>>> * <b>host</b>:the address for the consumer to connect to rabbitMQ
>>>>> producer<br>
>>>>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>>>>> producer<br>
>>>>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>>>>> rabbitMQ producer<br>
>>>>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>>>>> rabbitMQ producer<br>
>>>>> * <b>queueName</b>:the queueName for the consumer to connect to
>>>>> rabbitMQ producer<br>
>>>>> * <br>
>>>>>
>>>>> Reference:
>>>>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>>>>
>>>>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>>>>
>>>>> Thanks,
>>>>> Vikram
>>>>>
>>>>> On Wed, Jun 7, 2017 at 3:19 PM, <[email protected]> <mailto:[email protected]>
>>>>> wrote:
>>>>>> Hello,
>>>>>>
>>>>>> I compiled the whole thing but now I don't know exactly how to get it
>>>>>> running in Apex. Do I need an application.java like in the tutorial?
>>>>>> I do
>>>>>> have a simple RabbitMQ queue up and running on the server. How do I
>>>>>> consume
>>>>>> the messages with Apex and write them to hdfs?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Manfred
>>>>>>
>>>>>> Following steps were necessary to get the RabbitMq test to compile
>>>>>>
>>>>>> @TimeoutException
>>>>>> import java.util.concurrent.TimeoutException;
>>>>>> public void setup() throws IOException,TimeoutException
>>>>>> public void teardown() throws IOException,TimeoutException
>>>>>> protected void runTest(final int testNum) throws IOException
>>>>>>
>>>>>> @Build jars
>>>>>> cd apex-malhar/contrib/
>>>>>> mvn clean package -DskipTests
>>>>>>
>>>>>> cd apex-malhar/library/
>>>>>> mvn clean package -DskipTests
>>>>>> copy packages to project directory
>>>>>>
>>>>>> @Link them to the project
>>>>>> Add following lines to the pom.xml
>>>>>> <dependency>
>>>>>> <groupId>contrib</groupId>
>>>>>> <artifactId>com.datatorrent.contrib.helper</artifactId>
>>>>>> <version>1.0</version>
>>>>>> <scope>system</scope>
>>>>>>
>>>>>>
>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>lib</groupId>
>>>>>> <artifactId>com.datatorrent.lib.helper</artifactId>
>>>>>> <version>1.0</version>
>>>>>> <scope>system</scope>
>>>>>>
>>>>>>
>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>contrib</groupId>
>>>>>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>>>>> <version>1.0</version>
>>>>>> <scope>system</scope>
>>>>>>
>>>>>>
>>>>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>>>>> </dependency>
>>>>>> <dependency>
>>>>>> <groupId>Attribute</groupId>
>>>>>>
>>>>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>>>>> <version>1.0</version>
>>>>>> <scope>system</scope>
>>>>>>
>>>>>>
>>>>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>>>>> </dependency>
>>>>>>
>>>>>>
>>>>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>>>>
>>>>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper
>>>>>> are
>>>>>> under the test directory under malhar-contrib and malhar-library
>>>>>> respectively. You may need to build these jars yourself with test
>>>>>> scope to
>>>>>> include these packages.
>>>>>>
>>>>>> On Wed, May 31, 2017 at 9:39 AM, <[email protected]> <mailto:[email protected]>
>>>>>> wrote:
>>>>>>> Hello, (mea culpa for messing up the headline the first time)
>>>>>>>
>>>>>>> I'm currently trying to get the apex-malhar rabbitmq running. But
>>>>>>> I'm at a
>>>>>>> complete loss, while the examples are running fine I don't even get
>>>>>>> the
>>>>>>> RabbitMQInputOperatorTest.java to run.
>>>>>>>
>>>>>>> First it couldn't find the rabbitmq-client which was solveable by
>>>>>>> adding
>>>>>>> the dependency:
>>>>>>>
>>>>>>> <dependency>
>>>>>>> <groupId>com.rabbitmq</groupId>
>>>>>>> <artifactId>amqp-client</artifactId>
>>>>>>> <version>4.1.0</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>> But now it doesn't find the packages com.datatorrent.contrib.helper
>>>>>>> and
>>>>>>> com.datatorrent.lib.helper and can't find several symbols.
>>>>>>>
>>>>>>> Needless to say that I'm a beginner regarding Apex so does anyone
>>>>>>> know
>>>>>>> what exactly I'm doing wrong here?
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> Manfred.
>>>>>>>
>>>>>>>
>>>>
>>>>
>>>
>>
>