[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2907 ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r241782960 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect seek(currBatchTp, lastBatchMeta); -final ConsumerRecords records = consumer.poll(pollTimeoutMs); -LOG.debug("Polled [{}] records from Kafka.", records.count()); +final List> records = consumer.poll(pollTimeoutMs).records(currBatchTp); +LOG.debug("Polled [{}] records from Kafka.", records.size()); if (!records.isEmpty()) { for (ConsumerRecord record : records) { emitTuple(collector, record); } -// build new metadata -currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId()); +// build new metadata based on emitted records +currentBatch = new KafkaTridentSpoutBatchMetadata( +records.get(0).offset(), +records.get(records.size() - 1).offset(), +topologyContext.getStormId()); +} else { +//Build new metadata based on the consumer position. +//We want the next emit to start at the current consumer position, +//so make a meta that indicates that position - 1 is the last emitted offset +//This helps us avoid cases like STORM-3279, and simplifies the seek logic. +long lastEmittedOffset = consumer.position(currBatchTp) - 1; --- End diff -- okay sounds good. ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r234338505 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect seek(currBatchTp, lastBatchMeta); -final ConsumerRecords records = consumer.poll(pollTimeoutMs); -LOG.debug("Polled [{}] records from Kafka.", records.count()); +final List> records = consumer.poll(pollTimeoutMs).records(currBatchTp); +LOG.debug("Polled [{}] records from Kafka.", records.size()); if (!records.isEmpty()) { for (ConsumerRecord record : records) { emitTuple(collector, record); } -// build new metadata -currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId()); +// build new metadata based on emitted records +currentBatch = new KafkaTridentSpoutBatchMetadata( +records.get(0).offset(), +records.get(records.size() - 1).offset(), +topologyContext.getStormId()); +} else { +//Build new metadata based on the consumer position. +//We want the next emit to start at the current consumer position, +//so make a meta that indicates that position - 1 is the last emitted offset +//This helps us avoid cases like STORM-3279, and simplifies the seek logic. +long lastEmittedOffset = consumer.position(currBatchTp) - 1; --- End diff -- Yes, then the starting offset may be 0 and lastEmittedOffset could be -1. The `seek` method used by `emitPartitionBatchNew` always adds 1 to the last batch meta offset, so we'd be seeking to 0 or larger. `reEmitPartitionBatch` does use the last batch meta offset without adding 1, but if we hit this case there are no emits in the batch, so Trident shouldn't replay it. I'm happy to add guards for -1 to `reEmitPartitionBatch` though, since I don't feel certain that Trident could never replay an empty batch. Any other reason you can see that we should be aware of negative offsets? ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r234329136 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt tx, TridentCollector collect seek(currBatchTp, lastBatchMeta); -final ConsumerRecords records = consumer.poll(pollTimeoutMs); -LOG.debug("Polled [{}] records from Kafka.", records.count()); +final List> records = consumer.poll(pollTimeoutMs).records(currBatchTp); +LOG.debug("Polled [{}] records from Kafka.", records.size()); if (!records.isEmpty()) { for (ConsumerRecord record : records) { emitTuple(collector, record); } -// build new metadata -currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId()); +// build new metadata based on emitted records +currentBatch = new KafkaTridentSpoutBatchMetadata( +records.get(0).offset(), +records.get(records.size() - 1).offset(), +topologyContext.getStormId()); +} else { +//Build new metadata based on the consumer position. +//We want the next emit to start at the current consumer position, +//so make a meta that indicates that position - 1 is the last emitted offset +//This helps us avoid cases like STORM-3279, and simplifies the seek logic. +long lastEmittedOffset = consumer.position(currBatchTp) - 1; --- End diff -- Is there ever a chance that `lastEmittedOffset` could end up being negative? I am thinking specifically if we start up a topology reading from a new/empty topic. ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r233750714 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, ConsumerRecord record) * * This is the first batch for this partition * This is a replay of the first batch for this partition - * This is batch n for this partition, where batch 0...n-1 were all empty * * * @return the offset of the next fetch */ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { -if (isFirstPoll(tp)) { -if (firstPollOffsetStrategy == EARLIEST) { +if (isFirstPollSinceExecutorStarted(tp)) { +boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null +|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); +if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); -} else if (firstPollOffsetStrategy == LATEST) { +} else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); consumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch -} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { +} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || firstPollOffsetStrategy == EARLIEST) { --- End diff -- No, doesn't look like it. Good catch. ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user janithkv commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r233697224 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, ConsumerRecord record) * * This is the first batch for this partition * This is a replay of the first batch for this partition - * This is batch n for this partition, where batch 0...n-1 were all empty * * * @return the offset of the next fetch */ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { -if (isFirstPoll(tp)) { -if (firstPollOffsetStrategy == EARLIEST) { +if (isFirstPollSinceExecutorStarted(tp)) { +boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null +|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); +if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); -} else if (firstPollOffsetStrategy == LATEST) { +} else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); consumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch -} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { +} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || firstPollOffsetStrategy == EARLIEST) { LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); -} else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) { +} else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST || firstPollOffsetStrategy == LATEST) { --- End diff -- Will we ever hit firstPollOffsetStrategy == LATEST case here ? ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
Github user janithkv commented on a diff in the pull request: https://github.com/apache/storm/pull/2907#discussion_r233697180 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java --- @@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, ConsumerRecord record) * * This is the first batch for this partition * This is a replay of the first batch for this partition - * This is batch n for this partition, where batch 0...n-1 were all empty * * * @return the offset of the next fetch */ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) { -if (isFirstPoll(tp)) { -if (firstPollOffsetStrategy == EARLIEST) { +if (isFirstPollSinceExecutorStarted(tp)) { +boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null +|| !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId()); +if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp); consumer.seekToBeginning(Collections.singleton(tp)); -} else if (firstPollOffsetStrategy == LATEST) { +} else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) { LOG.debug("First poll for topic partition [{}], seeking to partition end", tp); consumer.seekToEnd(Collections.singleton(tp)); } else if (lastBatchMeta != null) { LOG.debug("First poll for topic partition [{}], using last batch metadata", tp); consumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch -} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) { +} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || firstPollOffsetStrategy == EARLIEST) { --- End diff -- Will we ever hit firstPollOffsetStrategy == EARLIEST case here ? ---
[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2907 STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ign⦠â¦ore EARLIEST and LATEST, and make EARLIEST and LATEST only take effect on topology deploy See https://issues.apache.org/jira/browse/STORM-2990 and https://issues.apache.org/jira/browse/STORM-3279. 3279 is fixed by always returning a metadata from the emitter, even if no tuples were emitted. If no tuples are emitted, the metadata will now indicate that `consumer.position() - 1` was emitted. 2990 is fixed by inspecting the lastBatchMeta topology id, and deciding whether to start over based on that. I expanded the emitter tests to cover these cases as well, and tried to make them a bit more readable by splitting them into multiple classes. The Kafka client dependency has been bumped to 0.11.0.0, since the MockConsumer wasn't behaving properly in 0.10.1.0. The only other change is to replace the KafkaUnit internals so we no longer depend on Kafka's internal test tooling. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-2990 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2907.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2907 commit 9247e4a59a2f8829238179066a7299fa1cafdbaa Author: Stig Rohde Døssing Date: 2018-11-14T15:17:03Z STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ignore EARLIEST and LATEST, and make EARLIEST and LATEST only take effect on topology deploy ---