[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2907


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-12-14 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2907#discussion_r241782960
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt 
tx, TridentCollector collect
 
 seek(currBatchTp, lastBatchMeta);
 
-final ConsumerRecords records = 
consumer.poll(pollTimeoutMs);
-LOG.debug("Polled [{}] records from Kafka.", records.count());
+final List> records = 
consumer.poll(pollTimeoutMs).records(currBatchTp);
+LOG.debug("Polled [{}] records from Kafka.", records.size());
 
 if (!records.isEmpty()) {
 for (ConsumerRecord record : records) {
 emitTuple(collector, record);
 }
-// build new metadata
-currentBatch = new 
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), 
this.topologyContext.getStormId());
+// build new metadata based on emitted records
+currentBatch = new KafkaTridentSpoutBatchMetadata(
+records.get(0).offset(),
+records.get(records.size() - 1).offset(),
+topologyContext.getStormId());
+} else {
+//Build new metadata based on the consumer position.
+//We want the next emit to start at the current consumer 
position,
+//so make a meta that indicates that position - 1 is the 
last emitted offset
+//This helps us avoid cases like STORM-3279, and 
simplifies the seek logic.
+long lastEmittedOffset = consumer.position(currBatchTp) - 
1;
--- End diff --

okay sounds good.


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-11-16 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2907#discussion_r234338505
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt 
tx, TridentCollector collect
 
 seek(currBatchTp, lastBatchMeta);
 
-final ConsumerRecords records = 
consumer.poll(pollTimeoutMs);
-LOG.debug("Polled [{}] records from Kafka.", records.count());
+final List> records = 
consumer.poll(pollTimeoutMs).records(currBatchTp);
+LOG.debug("Polled [{}] records from Kafka.", records.size());
 
 if (!records.isEmpty()) {
 for (ConsumerRecord record : records) {
 emitTuple(collector, record);
 }
-// build new metadata
-currentBatch = new 
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), 
this.topologyContext.getStormId());
+// build new metadata based on emitted records
+currentBatch = new KafkaTridentSpoutBatchMetadata(
+records.get(0).offset(),
+records.get(records.size() - 1).offset(),
+topologyContext.getStormId());
+} else {
+//Build new metadata based on the consumer position.
+//We want the next emit to start at the current consumer 
position,
+//so make a meta that indicates that position - 1 is the 
last emitted offset
+//This helps us avoid cases like STORM-3279, and 
simplifies the seek logic.
+long lastEmittedOffset = consumer.position(currBatchTp) - 
1;
--- End diff --

Yes, then the starting offset may be 0 and lastEmittedOffset could be -1.

The `seek` method used by `emitPartitionBatchNew` always adds 1 to the last 
batch meta offset, so we'd be seeking to 0 or larger.

`reEmitPartitionBatch` does use the last batch meta offset without adding 
1, but if we hit this case there are no emits in the batch, so Trident 
shouldn't replay it. I'm happy to add guards for -1 to `reEmitPartitionBatch` 
though, since I don't feel certain that Trident could never replay an empty 
batch.

Any other reason you can see that we should be aware of negative offsets?


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-11-16 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2907#discussion_r234329136
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -170,15 +170,25 @@ public void reEmitPartitionBatch(TransactionAttempt 
tx, TridentCollector collect
 
 seek(currBatchTp, lastBatchMeta);
 
-final ConsumerRecords records = 
consumer.poll(pollTimeoutMs);
-LOG.debug("Polled [{}] records from Kafka.", records.count());
+final List> records = 
consumer.poll(pollTimeoutMs).records(currBatchTp);
+LOG.debug("Polled [{}] records from Kafka.", records.size());
 
 if (!records.isEmpty()) {
 for (ConsumerRecord record : records) {
 emitTuple(collector, record);
 }
-// build new metadata
-currentBatch = new 
KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), 
this.topologyContext.getStormId());
+// build new metadata based on emitted records
+currentBatch = new KafkaTridentSpoutBatchMetadata(
+records.get(0).offset(),
+records.get(records.size() - 1).offset(),
+topologyContext.getStormId());
+} else {
+//Build new metadata based on the consumer position.
+//We want the next emit to start at the current consumer 
position,
+//so make a meta that indicates that position - 1 is the 
last emitted offset
+//This helps us avoid cases like STORM-3279, and 
simplifies the seek logic.
+long lastEmittedOffset = consumer.position(currBatchTp) - 
1;
--- End diff --

Is there ever a chance that `lastEmittedOffset` could end up being 
negative? I am thinking specifically if we start up a topology reading from a 
new/empty topic.


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-11-15 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2907#discussion_r233750714
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, 
ConsumerRecord record)
  * 
  * This is the first batch for this partition
  * This is a replay of the first batch for this partition
- * This is batch n for this partition, where batch 0...n-1 were 
all empty
  * 
  *
  * @return the offset of the next fetch
  */
 private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta) {
-if (isFirstPoll(tp)) {
-if (firstPollOffsetStrategy == EARLIEST) {
+if (isFirstPollSinceExecutorStarted(tp)) {
+boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == 
null 
+|| 
!topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
+if (firstPollOffsetStrategy == EARLIEST && 
isFirstPollSinceTopologyWasDeployed) {
 LOG.debug("First poll for topic partition [{}], seeking to 
partition beginning", tp);
 consumer.seekToBeginning(Collections.singleton(tp));
-} else if (firstPollOffsetStrategy == LATEST) {
+} else if (firstPollOffsetStrategy == LATEST && 
isFirstPollSinceTopologyWasDeployed) {
 LOG.debug("First poll for topic partition [{}], seeking to 
partition end", tp);
 consumer.seekToEnd(Collections.singleton(tp));
 } else if (lastBatchMeta != null) {
 LOG.debug("First poll for topic partition [{}], using last 
batch metadata", tp);
 consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
-} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || 
firstPollOffsetStrategy == EARLIEST) {
--- End diff --

No, doesn't look like it. Good catch.


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-11-14 Thread janithkv
Github user janithkv commented on a diff in the pull request:

https://github.com/apache/storm/pull/2907#discussion_r233697224
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, 
ConsumerRecord record)
  * 
  * This is the first batch for this partition
  * This is a replay of the first batch for this partition
- * This is batch n for this partition, where batch 0...n-1 were 
all empty
  * 
  *
  * @return the offset of the next fetch
  */
 private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta) {
-if (isFirstPoll(tp)) {
-if (firstPollOffsetStrategy == EARLIEST) {
+if (isFirstPollSinceExecutorStarted(tp)) {
+boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == 
null 
+|| 
!topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
+if (firstPollOffsetStrategy == EARLIEST && 
isFirstPollSinceTopologyWasDeployed) {
 LOG.debug("First poll for topic partition [{}], seeking to 
partition beginning", tp);
 consumer.seekToBeginning(Collections.singleton(tp));
-} else if (firstPollOffsetStrategy == LATEST) {
+} else if (firstPollOffsetStrategy == LATEST && 
isFirstPollSinceTopologyWasDeployed) {
 LOG.debug("First poll for topic partition [{}], seeking to 
partition end", tp);
 consumer.seekToEnd(Collections.singleton(tp));
 } else if (lastBatchMeta != null) {
 LOG.debug("First poll for topic partition [{}], using last 
batch metadata", tp);
 consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
-} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || 
firstPollOffsetStrategy == EARLIEST) {
 LOG.debug("First poll for topic partition [{}] with no 
last batch metadata, seeking to partition beginning", tp);
 consumer.seekToBeginning(Collections.singleton(tp));
-} else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
+} else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST || 
firstPollOffsetStrategy == LATEST) {
--- End diff --

Will we ever hit firstPollOffsetStrategy == LATEST case here ?


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-11-14 Thread janithkv
Github user janithkv commented on a diff in the pull request:

https://github.com/apache/storm/pull/2907#discussion_r233697180
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
 ---
@@ -218,26 +228,27 @@ private void emitTuple(TridentCollector collector, 
ConsumerRecord record)
  * 
  * This is the first batch for this partition
  * This is a replay of the first batch for this partition
- * This is batch n for this partition, where batch 0...n-1 were 
all empty
  * 
  *
  * @return the offset of the next fetch
  */
 private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata 
lastBatchMeta) {
-if (isFirstPoll(tp)) {
-if (firstPollOffsetStrategy == EARLIEST) {
+if (isFirstPollSinceExecutorStarted(tp)) {
+boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == 
null 
+|| 
!topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
+if (firstPollOffsetStrategy == EARLIEST && 
isFirstPollSinceTopologyWasDeployed) {
 LOG.debug("First poll for topic partition [{}], seeking to 
partition beginning", tp);
 consumer.seekToBeginning(Collections.singleton(tp));
-} else if (firstPollOffsetStrategy == LATEST) {
+} else if (firstPollOffsetStrategy == LATEST && 
isFirstPollSinceTopologyWasDeployed) {
 LOG.debug("First poll for topic partition [{}], seeking to 
partition end", tp);
 consumer.seekToEnd(Collections.singleton(tp));
 } else if (lastBatchMeta != null) {
 LOG.debug("First poll for topic partition [{}], using last 
batch metadata", tp);
 consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // 
seek next offset after last offset from previous batch
-} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+} else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST || 
firstPollOffsetStrategy == EARLIEST) {
--- End diff --

Will we ever hit  firstPollOffsetStrategy == EARLIEST  case here ?


---


[GitHub] storm pull request #2907: STORM-2990, STORM-3279: Fix issue where Kafka Trid...

2018-11-14 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/storm/pull/2907

STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ign…

…ore EARLIEST and LATEST, and make EARLIEST and LATEST only take effect 
on topology deploy

See https://issues.apache.org/jira/browse/STORM-2990 and 
https://issues.apache.org/jira/browse/STORM-3279.

3279 is fixed by always returning a metadata from the emitter, even if no 
tuples were emitted. If no tuples are emitted, the metadata will now indicate 
that `consumer.position() - 1` was emitted.

2990 is fixed by inspecting the lastBatchMeta topology id, and deciding 
whether to start over based on that. 

I expanded the emitter tests to cover these cases as well, and tried to 
make them a bit more readable by splitting them into multiple classes. The 
Kafka client dependency has been bumped to 0.11.0.0, since the MockConsumer 
wasn't behaving properly in 0.10.1.0.

The only other change is to replace the KafkaUnit internals so we no longer 
depend on Kafka's internal test tooling.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/storm STORM-2990

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2907


commit 9247e4a59a2f8829238179066a7299fa1cafdbaa
Author: Stig Rohde Døssing 
Date:   2018-11-14T15:17:03Z

STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ignore 
EARLIEST and LATEST, and make EARLIEST and LATEST only take effect on topology 
deploy




---