Sorry the two Snippets below where from different iterations. The Error
I get is the following:
[ERROR]
/home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38]
cannot find symbol
[ERROR] symbol: variable output
[ERROR] location: variable rabbitInput of type
com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator
My Code is as follows:
package com.example.rabbitMQ;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;
@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RabbitMQInputOperator rabbitInput =
dag.addOperator("Consumer",RabbitMQInputOperator.class);
rabbitInput.setHost("localhost");
rabbitInput.setPort(5672);
rabbitInput.setExchange("");
rabbitInput.setQueueName("hello");
LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());
dag.addStream("data", rabbitInput.output, out.input);
}
}
Cheers
Manfred.
Am 08.06.2017 um 04:34 schrieb vikram patil:
> Hi,
> dag.addStream() is actually used to create stream of from one Operator output
> port to other operators input port.
> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
> RabbitMQInputOperator.class);
> dag.addStream("data", *rabbitInput*.output, out.input);
> Looks like your operator name is incorrect? I see in your code snippet
> above, name of of RabbiMQInputOperator is *"Consumer".*
>
> In property name, you need to provide operator name you have specified
> in addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class)
> api call.
>
> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct
> correct given your operator name is correct ).
>
> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your
> code snippet ).
>
> I think tests which are provided in the Apache Malhar are very
> detailed, they run in local mode as unit tests so we have mocked
> actual rabbitmq by custom message publisher.
>
> For timebeing you set only queuename and hostname as
>
> // set your rabbitmq host .
> consumer.setHost("localhost"); // set your rabbitmq port
> consumer.setPort(5672) // It depends on your rabbitmq producer
> configuration but by default // its default exchange with "" ( No Name
> is provided ). consumer.setExchange(""); // set your queue name
> consumer.setQueueName("YOUR_QUEUE_NAME")
>
>
>
>
> If its okay, could you please share code from your application.java
> and properties.xml here?
>
> Thanks,
> Vikram
>
>
> On Thu, Jun 8, 2017 at 12:32 AM, <[email protected] <mailto:[email protected]>> wrote:
>
> Thanks very much for the help. The only problem left is that I
> don't quite understand dag.addstream().
>
> I tried this
>
> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
> RabbitMQInputOperator.class);
> dag.addStream("data", rabbitInput.output, out.input);
>
> but obviously this doesn't work. What I don't get is the
> difference between the ActiveMQ example and the RabbitMQ example.
> I looked over the test examples for RabbitMQ but don't seem to
> understand the logic behind it.
>
> Is this the correct wax to specify properties:
> <property>
> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
> <value>500</value>
> </property>
>
> Cheers
> Manfred.
>
>
> Am 07.06.2017 um 12:03 schrieb Vikram Patil:
>> Yes, you would need Application.java which will be way to define a DAG
>> for Apex Application.
>>
>> Please have look at the code from following example to find out how to
>> write JMS ActiveMQ based example:
>>
>>
>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ
>>
>> <https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ>
>>
>> This is how you can instantiate RabbitMQINputOperator and to a dag.
>> RabbitMQInputOperator consumer = dag.addOperator("Consumer",
>> RabbitMQInputOperator.class);
>>
>>
>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag
>>
>> <https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag>
>>
>> Following properties need to be specified in properties.xml
>>
>> * Properties:<br>
>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
>> * <b>bufferSize</b>: Size of holding buffer<br>
>> * <b>host</b>:the address for the consumer to connect to rabbitMQ
>> producer<br>
>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
>> producer<br>
>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to
>> rabbitMQ producer<br>
>> * <b>routingKey</b>:the routingKey for the consumer to connect to
>> rabbitMQ producer<br>
>> * <b>queueName</b>:the queueName for the consumer to connect to
>> rabbitMQ producer<br>
>> * <br>
>>
>> Reference:
>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
>>
>> <https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java>
>>
>> Thanks,
>> Vikram
>>
>> On Wed, Jun 7, 2017 at 3:19 PM, <[email protected]> <mailto:[email protected]>
>> wrote:
>>> Hello,
>>>
>>> I compiled the whole thing but now I don't know exactly how to get it
>>> running in Apex. Do I need an application.java like in the tutorial? I
>>> do
>>> have a simple RabbitMQ queue up and running on the server. How do I
>>> consume
>>> the messages with Apex and write them to hdfs?
>>>
>>> Cheers,
>>>
>>> Manfred
>>>
>>> Following steps were necessary to get the RabbitMq test to compile
>>>
>>> @TimeoutException
>>> import java.util.concurrent.TimeoutException;
>>> public void setup() throws IOException,TimeoutException
>>> public void teardown() throws IOException,TimeoutException
>>> protected void runTest(final int testNum) throws IOException
>>>
>>> @Build jars
>>> cd apex-malhar/contrib/
>>> mvn clean package -DskipTests
>>>
>>> cd apex-malhar/library/
>>> mvn clean package -DskipTests
>>> copy packages to project directory
>>>
>>> @Link them to the project
>>> Add following lines to the pom.xml
>>> <dependency>
>>> <groupId>contrib</groupId>
>>> <artifactId>com.datatorrent.contrib.helper</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>> <groupId>lib</groupId>
>>> <artifactId>com.datatorrent.lib.helper</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>> <groupId>contrib</groupId>
>>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>> <dependency>
>>> <groupId>Attribute</groupId>
>>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>>> <version>1.0</version>
>>> <scope>system</scope>
>>>
>>>
>>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
>>> </dependency>
>>>
>>>
>>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>>>
>>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are
>>> under the test directory under malhar-contrib and malhar-library
>>> respectively. You may need to build these jars yourself with test scope
>>> to
>>> include these packages.
>>>
>>> On Wed, May 31, 2017 at 9:39 AM, <[email protected]> <mailto:[email protected]>
>>> wrote:
>>>> Hello, (mea culpa for messing up the headline the first time)
>>>>
>>>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm
>>>> at a
>>>> complete loss, while the examples are running fine I don't even get the
>>>> RabbitMQInputOperatorTest.java to run.
>>>>
>>>> First it couldn't find the rabbitmq-client which was solveable by
>>>> adding
>>>> the dependency:
>>>>
>>>> <dependency>
>>>> <groupId>com.rabbitmq</groupId>
>>>> <artifactId>amqp-client</artifactId>
>>>> <version>4.1.0</version>
>>>> </dependency>
>>>>
>>>> But now it doesn't find the packages com.datatorrent.contrib.helper and
>>>> com.datatorrent.lib.helper and can't find several symbols.
>>>>
>>>> Needless to say that I'm a beginner regarding Apex so does anyone know
>>>> what exactly I'm doing wrong here?
>>>>
>>>> Cheers
>>>>
>>>> Manfred.
>>>>
>>>>
>
>