[
https://issues.apache.org/jira/browse/BAHIR-190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16842621#comment-16842621
]
ASF subversion and git services commented on BAHIR-190:
-------------------------------------------------------
Commit e5b1cae62f0fb47b15ff69d354b779b6072c27cf in bahir-flink's branch
refs/heads/master from Krystex
[ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=e5b1cae ]
[BAHIR-190] Fixed premature exit on empty queue
When the source queue has no more messages, the job
doesn't exit anymore. This was a problem with ActiveMQ.
Closes #53
> ActiveMQ connector stops on empty queue
> ---------------------------------------
>
> Key: BAHIR-190
> URL: https://issues.apache.org/jira/browse/BAHIR-190
> Project: Bahir
> Issue Type: Bug
> Components: Flink Streaming Connectors
> Affects Versions: Flink-1.0
> Reporter: Stephan Brosinski
> Priority: Critical
>
> I tried the ActiveMQ Flink Connector. Reading from an ActiveMQ queue, it
> seems to connector exits once there are no more messages in the queue. This
> ends the Flink job processing the stream.
> To me it seems, that the while loop inside the run method (AMQSource.java,
> line 222) should not do a return, but a continue if the message is no
> instance of ByteMessage, e.g. null.
> If I'm right, I can create a pull request showing the change.
> To reproduce:
>
> {code:java}
> ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("xxx", "xxx",
> "tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
> AMQSourceConfig<String> amqConfig = new
> AMQSourceConfig.AMQSourceConfigBuilder<String>()
> .setConnectionFactory(connectionFactory)
> .setDestinationName("test")
> .setDestinationType(DestinationType.QUEUE)
> .setDeserializationSchema(new SimpleStringSchema())
> .build();
> AMQSource<String> amqSource = new AMQSource<>(amqConfig);
> env.addSource(amqSource).print()
> env.setParallelism(1).execute("ActiveMQ Consumer");{code}
> Then point the Flink job at an empty ActiveMQ queue.
>
> Not sure if this is a bug, but it's not what I expected when I used the
> connector.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)