Thanks Manfred for resolving this issue. I checked in the code now. As you suggested RabbitMQInputOperator seems to be supporting non-durable exchange but with durable queues. That seems inconsistent. Please feel free to create ticket for an improvement for RabbiMQInputOperator regarding this issue.
Thanks & Regards, Vikram On Fri, Jul 7, 2017 at 2:48 PM, <a...@x5h.eu> wrote: > After various tests I finally got it all working nicely and for future > users I'll post here how. > > First the rabbitMQ configuration that was the only working one: > rabbitmqadmin declare exchange name=apex type=fanout durable=false > rabbitmqadmin declare queue name=test durable=true > rabbitmqadmin binding source="apex" destination_type="queue" > destination="test" routing_key="rktest" > > It is important that apex only accepts a non-durable exchange. But this > means you have to recreate it everytime you restart your RabbitMQ service. > > The "Mkdirs failed to create" error: > This just means that the DFS service is down or in my case the safemode is > on. > hdfs dfsadmin -safemode get > hdfs dfsadmin -safemode enter > > My example uses the following (I moved the operator values in a > corresponding *.xml file I just listed them here for better understanding): > > @ApplicationAnnotation(name="RabbitMQ2HDFS") > public class RabbitMQApplication implements StreamingApplication > { > > @Override > public void populateDAG(DAG dag, Configuration conf) > { > RabbitMQInputOperator in = dag.addOperator("rabbitInput",new > RabbitMQInputOperator()); > in.setHost("192.168.33.63"); > in.setExchange("apex"); > in.setExchangeType("fanout"); > in.setQueueName("test"); > > LineOutputOperator out = dag.addOperator("fileOut", new > LineOutputOperator()); > out.setFilePath("/hdfs/rabbitMQ"); > out.setBaseName("rabbitOut"); > out.setMaxLength(1024); > out.setRotationWindows(4); > dag.addStream("data", in.outputPort, out.input); > } > } > > And the corresponding Output Operator. The only important thing here was > that it extends the byte AbstractFileOutputOperator > > public class LineOutputOperator extends AbstractFileOutputOperator<byte[]> > { > private static final String NL = System.lineSeparator(); > private static final Charset CS = StandardCharsets.UTF_8; > > @NotNull > private String baseName; > > @Override > public byte[] getBytesForTuple(byte[] t) { > String result = new String(t, CS) + NL; > return result.getBytes(CS); > } > > @Override > protected String getFileName(byte[] tuple) { > return baseName; > } > > public String getBaseName() { return baseName; } > public void setBaseName(String v) { baseName = v; } > } > > The most pressing issue was that it won't run on the yarn cluster only in > local mode. I still have no idea why it diden't run but my best guess is > that it was a bad idea in the beginning to try the apex app in a Rasperry > Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and > without changing a thing in the application it worked perfectly. > Thanks for all the help! > > > Am 22.06.2017 um 11:33 schrieb a...@x5h.eu: > > I drilled the error down to this message: > > Mkdirs failed to create file:/home/pi/datatorrent/apps/application_ > 1498123667708_0001/checkpoints/2 > > I guess i have something buggy in my configuration does any of you know > how to solve this error? Should I start the application with the same user > I'm starting yarn? > > Cheers > > Manfred. > > > > Am 10.06.2017 um 14:50 schrieb a...@x5h.eu: > > Hello, > > you were completely right it seems that there are problems with my test > scenario regarding the hadoop, yarn installation and the application never > starts. I found this entries in the log: > > 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server. > resourcemanager.ApplicationMasterService: Registering app attempt : > appattempt_1495629011552_0011_000001 > 2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server. > resourcemanager.rmapp.attempt.RMAppAttemptImpl: > appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED > 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server. > resourcemanager.scheduler.capacity.LeafQueue: not starting application as > amIfStarted exceeds amLimit > 2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server. > resourcemanager.scheduler.capacity.LeafQueue: Application added - appId: > application_1495629011552_0011 user: org.apache.hadoop.yarn.server. > resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue: > default #user-pending-applications: 1 #user-active-applications: 1 > #queue-pending-applications: 1 #queue-active-applications: 1 > > Therefore the application never leaves the state "undefined". Since the > local tests were running fine and the launch of the application didn't > raise an error I missed the problem with the hadoop installation. Thanks > for the correct hint to look at the hadoop cluster. > > Cheers > Manfred. > > > Am 09.06.2017 um 15:23 schrieb vikram patil: > > 1) Are you doing it on your local environment? > 2) If you are doing it locally I would suggest following options > 1) If you dont want to create queue on rabbitmq by yourself . Set > queuename on operator > in.setQueueName("YOUR_QUEUE_NAME" ) > Operator will do following steps : > * Create Durable Queue in RabbitMQ > * You have specfied exchange and exchangeType . > So it will create an exchange using this information and > bind created queue with exchange with default routing key which will be "". > Right now it must be creating auto generated unique named queue > for you. > > 2) You can create your own exchange and durable queue using rabbitmq > admin . You will have to install rabbitmq plugins for that. You can use it > to publish some test data as well. > > Using apex-cli you can check status of your application, if its failing > then you should check logs from userlogs in hadoop logs directory. > > Thanks & Regards, > Vikram > > > > > On Fri, Jun 9, 2017 at 6:42 PM, <a...@x5h.eu> wrote: > >> Finally got rid of all errors but now I have the problem that the apex >> application does not seem to register at the RabbitMQ exchange. >> >> This is my code: >> >> @ApplicationAnnotation(name="RabbitMQ2HDFS") >> public class RabbitMQApplication implements StreamingApplication >> { >> >> @Override >> public void populateDAG(DAG dag, Configuration conf) >> { >> RabbitMQInputOperator in = dag.addOperator("rabbitInput",new >> RabbitMQInputOperator()); >> in.setHost("localhost"); >> //in.setPort(5672); >> in.setExchange("apex"); >> in.setExchangeType("fanout"); >> ConsoleOutputOperator console = dag.addOperator("console", new >> ConsoleOutputOperator()); >> dag.addStream("rand_console",in.outputPort, console.input); >> >> } >> >> } >> If I launch the application everthing executes without an error but if i >> list the bindings on the exchange, there is none. >> >> Anyone even an idea how i can start to debug this? >> >> Cheers >> Manfred. >> >> >> >> Am 08.06.2017 um 18:04 schrieb a...@x5h.eu: >> >> Okay i found the error, I copied the LineOutputOperator.java >> <https://github.com/DataTorrent/examples/blob/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ/LineOutputOperator.java> >> from the jmsActiveMQ example and found there >> public class LineOutputOperator extends AbstractFileOutputOperator<String >> > >> >> Instead i took the LineOutputOperator.java >> <https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/LineOutputOperator.java>from >> the Kafka 0.9 example there the class is correctly defined for the >> RabbitMQInputOperator >> >> So far so good now it compiles without errors. >> >> Cheers >> >> Manfred >> Am 08.06.2017 um 17:38 schrieb a...@x5h.eu: >> >> I still don't get it completely: (The rest of the code is in the Email >> before, this is only the necessary sample) >> >> 1. dag.addStream("test", rabbitInput.output, out.input); >> Results in the following error: >> [ERROR] symbol: variable output >> [ERROR] location: variable rabbitInput of type >> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >> >> 2. dag.addStream("test", rabbitInput.outputPort, out.input); >> Results in the following error: >> [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/ >> RabbitMQApplication.java:[31,8] no suitable method found for >> addStream(java.lang.String,com.datatorrent.api.DefaultOutput >> Port<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>) >> [ERROR] method com.datatorrent.api.DAG.<T>add >> Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >> extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is >> not applicable >> [ERROR] (inferred type does not conform to upper bound(s) >> [ERROR] inferred: byte[] >> [ERROR] upper bound(s): java.lang.String,java.lang.Object) >> [ERROR] method com.datatorrent.api.DAG.<T>add >> Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >> extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not >> applicable >> [ERROR] (inferred type does not conform to upper bound(s) >> [ERROR] inferred: byte[] >> [ERROR] upper bound(s): java.lang.String,java.lang.Object) >> [ERROR] method com.datatorrent.api.DAG.<T>add >> Stream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? >> extends T>,com.datatorrent.api.Operator.InputPort<? super >> T>,com.datatorrent.api.Operator.InputPort<? super T>) is not >> applicable >> [ERROR] (cannot infer type-variable(s) T >> [ERROR] (actual and formal argument lists differ in length)) >> >> >> >> >> It seems that on the one hand the RabbitMQInputOperator.class does not >> have an output method and on the other hand the addStream method only >> accepts outputPort combined with inputPort methods or output and input >> methods of the corresponding classes. Does that mean I only can use a class >> that implements inputPort method for this example? >> >> Cheers >> >> Manfred. >> >> >> >> Am 08.06.2017 um 10:05 schrieb a...@x5h.eu: >> >> Sorry the two Snippets below where from different iterations. The Error I >> get is the following: >> >> [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/ >> RabbitMQApplication.java:[31,38] cannot find symbol >> [ERROR] symbol: variable output >> [ERROR] location: variable rabbitInput of type >> com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator >> >> My Code is as follows: >> >> package com.example.rabbitMQ; >> >> import org.apache.hadoop.conf.Configuration; >> >> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; >> import com.datatorrent.api.annotation.ApplicationAnnotation; >> import com.datatorrent.api.StreamingApplication; >> import com.datatorrent.api.DAG; >> import com.datatorrent.lib.io.jms.JMSStringInputOperator; >> >> @ApplicationAnnotation(name="RabbitMQ2HDFS") >> public class RabbitMQApplication implements StreamingApplication >> { >> >> @Override >> public void populateDAG(DAG dag, Configuration conf) >> { >> >> RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",Rab >> bitMQInputOperator.class); >> rabbitInput.setHost("localhost"); >> rabbitInput.setPort(5672); >> rabbitInput.setExchange(""); >> rabbitInput.setQueueName("hello"); >> LineOutputOperator out = dag.addOperator("fileOut", new >> LineOutputOperator()); >> >> dag.addStream("data", rabbitInput.output, out.input); >> } >> } >> >> Cheers >> >> Manfred. >> >> >> >> Am 08.06.2017 um 04:34 schrieb vikram patil: >> >> Hi, >> >> dag.addStream() is actually used to create stream of from one Operator >> output port to other operators input port. >> >> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >> RabbitMQInputOperator.class); >> dag.addStream("data", *rabbitInput*.output, out.input); >> >> Looks like your operator name is incorrect? I see in your code snippet >> above, name of of RabbiMQInputOperator is *"Consumer".* >> >> In property name, you need to provide operator name you have specified in >> addOperator(*"NAME OF THE OPERATOR"*, RabbitMQInputOperator.class) api >> call. >> >> dt.operator.*rabbitMQIn*.prop.tuple_blast ( Syntax is correct >> correct given your operator name is correct ). >> >> ( It should be dt.operator.*Consumer*.prop.tuple_blast based on your >> code snippet ). >> >> I think tests which are provided in the Apache Malhar are very detailed, >> they run in local mode as unit tests so we have mocked actual rabbitmq by >> custom message publisher. >> >> For timebeing you set only queuename and hostname as >> >> // set your rabbitmq host . >> consumer.setHost("localhost"); // set your rabbitmq port >> consumer.setPort(5672) // It depends on your rabbitmq producer >> configuration but by default // its default exchange with "" ( No Name is >> provided ). consumer.setExchange(""); // set your queue name >> consumer.setQueueName("YOUR_QUEUE_NAME") >> >> >> >> If its okay, could you please share code from your application.java and >> properties.xml here? >> >> Thanks, >> Vikram >> >> >> On Thu, Jun 8, 2017 at 12:32 AM, <a...@x5h.eu> wrote: >> >>> Thanks very much for the help. The only problem left is that I don't >>> quite understand dag.addstream(). >>> >>> I tried this >>> >>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>> RabbitMQInputOperator.class); >>> dag.addStream("data", rabbitInput.output, out.input); >>> >>> >>> but obviously this doesn't work. What I don't get is the difference >>> between the ActiveMQ example and the RabbitMQ example. I looked over the >>> test examples for RabbitMQ but don't seem to understand the logic behind >>> it. >>> >>> Is this the correct wax to specify properties: >>> <property> >>> <name>dt.operator.rabbitMQIn.prop.tuple_blast</name> >>> <value>500</value> >>> </property> >>> >>> Cheers >>> Manfred. >>> >>> >>> Am 07.06.2017 um 12:03 schrieb Vikram Patil: >>> >>> Yes, you would need Application.java which will be way to define a DAG >>> for Apex Application. >>> >>> Please have look at the code from following example to find out how to >>> write JMS ActiveMQ based example: >>> https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ >>> >>> This is how you can instantiate RabbitMQINputOperator and to a dag. >>> RabbitMQInputOperator consumer = dag.addOperator("Consumer", >>> RabbitMQInputOperator.class); >>> https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag >>> >>> Following properties need to be specified in properties.xml >>> >>> * Properties:<br> >>> * <b>tuple_blast</b>: Number of tuples emitted in each burst<br> >>> * <b>bufferSize</b>: Size of holding buffer<br> >>> * <b>host</b>:the address for the consumer to connect to rabbitMQ >>> producer<br> >>> * <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ >>> producer<br> >>> * <b>exchangeType</b>:the exchangeType for the consumer to connect to >>> rabbitMQ producer<br> >>> * <b>routingKey</b>:the routingKey for the consumer to connect to >>> rabbitMQ producer<br> >>> * <b>queueName</b>:the queueName for the consumer to connect to >>> rabbitMQ producer<br> >>> * <br> >>> >>> Reference: >>> https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java >>> >>> Thanks, >>> Vikram >>> >>> On Wed, Jun 7, 2017 at 3:19 PM, <a...@x5h.eu> <a...@x5h.eu> wrote: >>> >>> Hello, >>> >>> I compiled the whole thing but now I don't know exactly how to get it >>> running in Apex. Do I need an application.java like in the tutorial? I do >>> have a simple RabbitMQ queue up and running on the server. How do I consume >>> the messages with Apex and write them to hdfs? >>> >>> Cheers, >>> >>> Manfred >>> >>> Following steps were necessary to get the RabbitMq test to compile >>> >>> @TimeoutException >>> import java.util.concurrent.TimeoutException; >>> public void setup() throws IOException,TimeoutException >>> public void teardown() throws IOException,TimeoutException >>> protected void runTest(final int testNum) throws IOException >>> >>> @Build jars >>> cd apex-malhar/contrib/ >>> mvn clean package -DskipTests >>> >>> cd apex-malhar/library/ >>> mvn clean package -DskipTests >>> copy packages to project directory >>> >>> @Link them to the project >>> Add following lines to the pom.xml >>> <dependency> >>> <groupId>contrib</groupId> >>> <artifactId>com.datatorrent.contrib.helper</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath> >>> </dependency> >>> <dependency> >>> <groupId>lib</groupId> >>> <artifactId>com.datatorrent.lib.helper</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath> >>> </dependency> >>> <dependency> >>> <groupId>contrib</groupId> >>> <artifactId>com.datatorrent.contrib.rabbitmq</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath> >>> </dependency> >>> <dependency> >>> <groupId>Attribute</groupId> >>> <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId> >>> <version>1.0</version> >>> <scope>system</scope> >>> >>> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath> >>> </dependency> >>> >>> >>> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare: >>> >>> Both com.datatorrent.contrib.helper and com.datatorrent.lib.helper are >>> under the test directory under malhar-contrib and malhar-library >>> respectively. You may need to build these jars yourself with test scope to >>> include these packages. >>> >>> On Wed, May 31, 2017 at 9:39 AM, <a...@x5h.eu> <a...@x5h.eu> wrote: >>> >>> Hello, (mea culpa for messing up the headline the first time) >>> >>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a >>> complete loss, while the examples are running fine I don't even get the >>> RabbitMQInputOperatorTest.java to run. >>> >>> First it couldn't find the rabbitmq-client which was solveable by adding >>> the dependency: >>> >>> <dependency> >>> <groupId>com.rabbitmq</groupId> >>> <artifactId>amqp-client</artifactId> >>> <version>4.1.0</version> >>> </dependency> >>> >>> But now it doesn't find the packages com.datatorrent.contrib.helper and >>> com.datatorrent.lib.helper and can't find several symbols. >>> >>> Needless to say that I'm a beginner regarding Apex so does anyone know >>> what exactly I'm doing wrong here? >>> >>> Cheers >>> >>> Manfred. >>> >>> >>> >>> >>> >> >> >> >> >> > > > >