[ https://issues.apache.org/jira/browse/APEXMALHAR-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ananth closed APEXMALHAR-2493. ------------------------------ > KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during > recovery > ----------------------------------------------------------------------------------- > > Key: APEXMALHAR-2493 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2493 > Project: Apache Apex Malhar > Issue Type: Bug > Reporter: Chaitanya > Assignee: Chaitanya > Fix For: 3.8.0 > > > Steps to reproduce the issue: > --------------------------------------- > - Created the Kafka topic with single partition. > - Created the application with the following DAG: > BatchSequenceGenerator -> KafkaSinglePortExactlyOnceOutputOperator > # of partitions of KafkaSinglePortExactlyOnceOutputOperator = 2. > Let's say KO1, KO2 are the two instances. > - Launched the app, after some time, manually killed the one of the instance > of "KafkaSinglePortExactlyOnceOutputOperator" operator(KO2). > - During recovery, the instance comes up and after some time, it goes to the > blocked state. App master killed this instance. > Observation: > ---------------- > * There is an infinite while loop in rebuildPartialWindow() method. > * While loop will break on the below 2 conditions: > a) # of trails for "polled records from Kafka is empty" = 10 > b) Crossed boundary (consumerRecord.offset() >= currentOffset) > In this scenario, KO1 keeps on writing the data to Kafka. So, the first > condition will not satisfy. > Operator is not checking the 2nd condition because of the below continue > statement: > if (!doesKeyBelongsToThisInstance(operatorId, > consumerRecord.key())) { > continue; > } > Solution: First check the cross boundary condition and then check the > doesKeyBelongsToThisInstance(..). -- This message was sent by Atlassian JIRA (v6.4.14#64029)