[
https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340573#comment-15340573
]
ASF GitHub Bot commented on APEXMALHAR-2120:
--------------------------------------------
Github user siyuanh commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/321#discussion_r67779698
--- Diff:
kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---
@@ -74,7 +75,7 @@
private static final Logger logger =
LoggerFactory.getLogger(KafkaConsumerWrapper.class);
- private boolean isAlive = false;
--- End diff --
The stop method cannot guarantee immediate stop either unless you
synchronize the stop method with the while block in the separate thread. Here
as long as there is no concurrent data write operation, I think it should be
fine
> Fix bugs on KafkaInputOperatorTest AbstractKafkaInputOperator
> -------------------------------------------------------------
>
> Key: APEXMALHAR-2120
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
> Project: Apache Apex Malhar
> Issue Type: Bug
> Affects Versions: 3.4.0
> Reporter: bright chen
> Assignee: bright chen
> Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal
> tuples, but in fact the tuples could be out of order where support multiple
> cluster or partition
> - The operator AbstractKafkaInputOperator implemented as "at least once", but
> the test case assume "exactly once"
> ====================================================================================================
> problem of AbstractKafkaInputOperator:
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - RuntimeException: Couldn't replay the offset:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure()
> with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many")
> throws following exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka
> inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster
> log - container-6 msg: Stopped running due to an exception.
> java.lang.RuntimeException: Couldn't replay the offset
> at
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> Undefined offset with no reset policy for partition: testtopic0-1
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - ConcurrentModificationException
> 2016-06-16 10:14:32,400 [1/Kafka
> inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer
> run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka
> inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0,
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
> messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an
> exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
> at
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
> at
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
> at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
> at
> com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
> at
> com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
> 2016-06-16 10:14:32,400 [2/Kafka
> inputtesttopic4:KafkaSinglePortInputOperator] ERROR engine.StreamingContainer
> run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka
> inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff, 0,
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
> messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an
> exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
> at
> org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
> at
> org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
> at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
> at
> com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
> at
> com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)