[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2019-04-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16810946#comment-16810946
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang commented on pull request #6546: KAFKA-7192: Cherry-pick 5430 to 
1.1
URL: https://github.com/apache/kafka/pull/6546
 
 
   The first PR of KAFKA-7192 is cherry-picked to 1.1 but the follow-up 
(https://github.com/apache/kafka/pull/5430) is not. This is causing flaky EOS 
system test failures.
   
   Some test results:
   
   In 2.0 branch, running 25 times (the streams_eos_test has 4 tests, so = 100 
tests), no failures:
   
   
http://confluent-kafka-2-0-system-test-results.s3-us-west-2.amazonaws.com/2019-04-05--001.1554466177--apache--2.0--db22e3d/report.html
   
   In 1.1 branch before this PR, running 5 times, failed 10 tests:
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-04-02--001.1554239700--guozhangwang--KMinor-1.1-eos-test--8395fce/report.html
   
   In this branch (after this PR), running 25 times, no failures:
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-04-05--001.1554465488--guozhangwang--KMinor-1.1-eos-test--897aa03/report.html
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642553#comment-16642553
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax closed pull request #5657: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 410212e1ff0..eeaed893482 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -198,4 +198,10 @@ public ThreadCache getCache() {
 public void initialized() {
 initialized = true;
 }
+
+@Override
+public void uninitialize() {
+initialized = false;
+}
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index baea4af62f1..fcf2f6b13b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -107,7 +107,7 @@ public final String applicationId() {
 }
 
 @Override
-public final Set partitions() {
+public Set partitions() {
 return partitions;
 }
 
@@ -226,6 +226,9 @@ void registerStateStores() {
 }
 }
 
+void reinitializeStateStoresForPartitions(final TopicPartition partitions) 
{
+stateMgr.reinitializeStateStoresForPartitions(partitions, 
processorContext);
+}
 
 /**
  * @throws ProcessorStateException if there is an error while closing the 
state manager
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 0d9d04de5cc..cfce57588e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -50,7 +50,7 @@
 // IQ may access this map.
 private Map running = new ConcurrentHashMap<>();
 private Map runningByPartition = new HashMap<>();
-private Map restoringByPartition = new HashMap<>();
+private Map restoringByPartition = new 
HashMap<>();
 private int committed = 0;
 
 
@@ -122,7 +122,8 @@ void addNewTask(final Task task) {
 try {
 if (!entry.getValue().initializeStateStores()) {
 log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
-addToRestoring(entry.getValue());
+// cast is safe, because StandbyTasks always returns 
`true` in `initializeStateStores()` above
+addToRestoring((StreamTask) entry.getValue());
 } else {
 transitionToRunning(entry.getValue(), readyPartitions);
 }
@@ -278,7 +279,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set
 return false;
 }
 
-private void addToRestoring(final Task task) {
+private void addToRestoring(final StreamTask task) {
 restoring.put(task.id(), task);
 for (TopicPartition topicPartition : task.partitions()) {
 restoringByPartition.put(topicPartition, task);
@@ -307,7 +308,7 @@ private void transitionToRunning(final Task task, final 
Set read
 }
 
 @Override
-public Task restoringTaskFor(final TopicPartition partition) {
+public StreamTask restoringTaskFor(final TopicPartition partition) {
 return restoringByPartition.get(partition);
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac81a6..b5719b111f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@
  * Mark this contex as being initialized
  */
 void initialized();
+
+/**
+ * Mark this context as being uninitialized
+ */
+void unini

[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644133#comment-16644133
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax opened a new pull request #5767: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5767
 
 
   Unified backport of PRs #5421 and #5430
   
   Also compare backport to `0.11.0` #5641 and `1.0` #5657


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-30 Thread Tobias Johansson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668859#comment-16668859
 ] 

Tobias Johansson commented on KAFKA-7192:
-

Will this be fixed in Kafka v1.1.2?

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-30 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16669064#comment-16669064
 ] 

Matthias J. Sax commented on KAFKA-7192:


I hope so. Did not have time yet, to port the fix into 1.1 branch.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-01 Thread Tobias Johansson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671223#comment-16671223
 ] 

Tobias Johansson commented on KAFKA-7192:
-

[~mjsax] thanks! Is the issue only on client side (would updated client lib fix 
it)? And would disabling RocksDB mitigate the issue in the meantime? I haven't 
been able to re-produce the issue with RocksDB disabled.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672226#comment-16672226
 ] 

Matthias J. Sax commented on KAFKA-7192:


Yes, it's a client side bug and related to RocksDB. If you run with in-memory 
store, you won't see this issue.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687443#comment-16687443
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax opened a new pull request #5915: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5915
 
 
   Follow up PR to #5657. Fixes buggy back port.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689059#comment-16689059
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang closed pull request #5915: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5915
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34350c17eb0..c03de2d4a2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -59,9 +59,14 @@ public StoreChangelogReader(final Consumer 
consumer,
 
 @Override
 public void register(final StateRestorer restorer) {
-restorer.setUserRestoreListener(userStateRestoreListener);
-stateRestorers.put(restorer.partition(), restorer);
-needsInitializing.put(restorer.partition(), restorer);
+final StateRestorer existingRestorer = 
stateRestorers.get(restorer.partition());
+if (existingRestorer == null) {
+restorer.setUserRestoreListener(userStateRestoreListener);
+stateRestorers.put(restorer.partition(), restorer);
+needsInitializing.put(restorer.partition(), restorer);
+} else {
+needsInitializing.put(restorer.partition(), existingRestorer);
+}
 }
 
 /**
@@ -188,7 +193,6 @@ private void startRestoration(final Map initializ
 
restorer.setCheckpointOffset(consumer.position(restoringPartition));
 
 task.reinitializeStateStoresForPartitions(restoringPartition);
-stateRestorers.get(restoringPartition).restoreStarted();
 } else {
 log.info("Restoring task {}'s state store {} from beginning of 
the changelog {} ", task.id, restorer.storeName(), restoringPartition);
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692414#comment-16692414
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax closed pull request #5767: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5767
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index a99e45147b9..40ce79c289e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -24,4 +24,8 @@
 super(logContext, "standby task");
 }
 
+void addToRestoring(final StandbyTask task) {
+throw new UnsupportedOperationException("Standby tasks cannot be 
restored actively.");
+}
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f6488e7..98c2bbd2563 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -23,12 +23,21 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedStreamsTasks extends AssignedTasks implements 
RestoringTasks {
 private final Logger log;
+private final Map restoring = new HashMap<>();
+private final Set restoredPartitions = new HashSet<>();
+private final Map restoringByPartition = new 
HashMap<>();
 private final TaskAction maybeCommitAction;
 private int committed = 0;
 
@@ -59,6 +68,52 @@ public StreamTask restoringTaskFor(final TopicPartition 
partition) {
 return restoringByPartition.get(partition);
 }
 
+void updateRestored(final Collection restored) {
+if (restored.isEmpty()) {
+return;
+}
+log.trace("Stream task changelog partitions that have completed 
restoring so far: {}", restored);
+restoredPartitions.addAll(restored);
+for (final Iterator> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
+final Map.Entry entry = it.next();
+final StreamTask task = entry.getValue();
+if (restoredPartitions.containsAll(task.changelogPartitions())) {
+transitionToRunning(task);
+it.remove();
+log.trace("Stream task {} completed restoration as all its 
changelog partitions {} have been applied to restore state",
+task.id(),
+task.changelogPartitions());
+} else {
+if (log.isTraceEnabled()) {
+final HashSet outstandingPartitions = new 
HashSet<>(task.changelogPartitions());
+outstandingPartitions.removeAll(restoredPartitions);
+log.trace("Stream task {} cannot resume processing yet 
since some of its changelog partitions have not completed restoring: {}",
+task.id(),
+outstandingPartitions);
+}
+}
+}
+if (allTasksRunning()) {
+restoredPartitions.clear();
+}
+}
+
+void addToRestoring(final StreamTask task) {
+restoring.put(task.id(), task);
+for (final TopicPartition topicPartition : task.partitions()) {
+restoringByPartition.put(topicPartition, task);
+}
+for (final TopicPartition topicPartition : task.changelogPartitions()) 
{
+restoringByPartition.put(topicPartition, task);
+}
+}
+
+boolean allTasksRunning() {
+return created.isEmpty()
+&& suspended.isEmpty()
+&& restoring.isEmpty();
+}
+
 /**
  * @throws TaskMigratedException if committing offsets failed (non-EOS)
  *   or if the task producer got fenced (EOS)
@@ -139,4 +194,43 @@ int punctuate() {
 return punctuated;
 }
 
+RuntimeException suspend() {
+fi

[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692417#comment-16692417
 ] 

Matthias J. Sax commented on KAFKA-7192:


[~tobiajo] Back ported to 1.1 branch :)

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555045#comment-16555045
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang opened a new pull request #5421: KAFKA-7192: Wipe out if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5421
 
 
   1. As titled and as described in comments.
   2. Modified unit test slightly to insert for new keys in committed data to 
expose this issue.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555049#comment-16555049
 ] 

Guozhang Wang commented on KAFKA-7192:
--

Hello [~jonmbates] Thanks for reporting this issue. I've successfully 
reproduced this issue and found the root cause.

I've submitted the PR and added a unit test to expose this issue, and confirmed 
that without the PR that unit test will fail.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-25 Thread Jon Bates (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555650#comment-16555650
 ] 

Jon Bates commented on KAFKA-7192:
--

Great; thanks! We came up with a similar general solution (delete state stores 
that don't have a checkpoint file), but don't have the insight into Kafka's 
internals.

Just as a sanity check, if the checkpoint file points to a offset behind the 
changelog tail, does Kafka Streams continue syncing the state store before 
consuming from the source topics? (I believe this happens already, but its just 
to aid my understanding)1`

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-25 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556021#comment-16556021
 ] 

Matthias J. Sax commented on KAFKA-7192:


It's expected that the checkpoint file is one lower, because the last "record" 
is the log is the commit marker eating up one offset. However, the checkpoint 
file always contains "last-restored-offset plus 1". Thus, the overall restore 
will not be corrupted by the offset being off by one.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559096#comment-16559096
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang closed pull request #5421: KAFKA-7192: Wipe out if EOS is turned 
on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5421
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 188ff473038..94e4c71d9c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -137,6 +137,10 @@ public String toString() {
 return toString("");
 }
 
+public boolean isEosEnabled() {
+return eosEnabled;
+}
+
 /**
  * Produces a string representation containing useful information about a 
Task starting with the given indent.
  * This is useful in debugging scenarios.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e7558..c1a41cefc23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -55,6 +55,10 @@ public TopicPartition partition() {
 return partition;
 }
 
+public String storeName() {
+return storeName;
+}
+
 long checkpoint() {
 return checkpoint == null ? NO_CHECKPOINT : checkpoint;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 07af8019aef..1927b5a7af7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -71,7 +71,7 @@ public void register(final StateRestorer restorer) {
 
 public Collection restore(final RestoringTasks active) {
 if (!needsInitializing.isEmpty()) {
-initialize();
+initialize(active);
 }
 
 if (needsRestoring.isEmpty()) {
@@ -111,7 +111,7 @@ public void register(final StateRestorer restorer) {
 return completed();
 }
 
-private void initialize() {
+private void initialize(final RestoringTasks active) {
 if (!restoreConsumer.subscription().isEmpty()) {
 throw new StreamsException("Restore consumer should not be 
subscribed to any topics (" + restoreConsumer.subscription() + ")");
 }
@@ -165,11 +165,12 @@ private void initialize() {
 
 // set up restorer for those initializable
 if (!initializable.isEmpty()) {
-startRestoration(initializable);
+startRestoration(initializable, active);
 }
 }
 
-private void startRestoration(final Map 
initialized) {
+private void startRestoration(final Map 
initialized,
+  final RestoringTasks active) {
 log.debug("Start restoring state stores from changelog topics {}", 
initialized.keySet());
 
 final Set assignment = new 
HashSet<>(restoreConsumer.assignment());
@@ -186,6 +187,18 @@ private void startRestoration(final Map initializ
 
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
 restorer.restoreStarted();
 } else {
+final StreamTask task = 
active.restoringTaskFor(restorer.partition());
+
+// If checkpoint does not exist it means the task was not 
shutdown gracefully before;
+// and in this case if EOS is turned on we should wipe out the 
state and re-initialize the task
+if (task.isEosEnabled()) {
+log.info("No checkpoint found for task {} state store {} 
changelog {} with EOS turned on. " +
+"Reinitializing the task and restore its state 
from the beginning.", task.id, restorer.storeName(), restorer.partition());
+
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
+} else {
+log.info("Restoring task {}'s state store {} from 
beginning of the changelog {} ", task.id, r

[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16560326#comment-16560326
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang opened a new pull request #5430: KAFKA-7192 Follow-up: update 
checkpoint to the reset beginning offset
URL: https://github.com/apache/kafka/pull/5430
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16560622#comment-16560622
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang closed pull request #5430: KAFKA-7192 Follow-up: update checkpoint 
to the reset beginning offset
URL: https://github.com/apache/kafka/pull/5430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index c1a41cefc23..3bbf42ead27 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@
 
 static final int NO_CHECKPOINT = -1;
 
-private final Long checkpoint;
 private final long offsetLimit;
 private final boolean persistent;
 private final String storeName;
 private final TopicPartition partition;
 private final CompositeRestoreListener compositeRestoreListener;
 
+private long checkpointOffset;
 private long restoredOffset;
 private long startingOffset;
 private long endingOffset;
@@ -45,7 +45,7 @@
   final String storeName) {
 this.partition = partition;
 this.compositeRestoreListener = compositeRestoreListener;
-this.checkpoint = checkpoint;
+this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : 
checkpoint;
 this.offsetLimit = offsetLimit;
 this.persistent = persistent;
 this.storeName = storeName;
@@ -60,7 +60,11 @@ public String storeName() {
 }
 
 long checkpoint() {
-return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+return checkpointOffset;
+}
+
+void setCheckpointOffset(final long checkpointOffset) {
+this.checkpointOffset = checkpointOffset;
 }
 
 void restoreStarted() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 1927b5a7af7..9185920f242 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -48,8 +48,9 @@
 private final Map endOffsets = new HashMap<>();
 private final Map> partitionInfo = new 
HashMap<>();
 private final Map stateRestorers = new 
HashMap<>();
-private final Map needsRestoring = new 
HashMap<>();
-private final Map needsInitializing = new 
HashMap<>();
+private final Set needsRestoring = new HashSet<>();
+private final Set needsInitializing = new HashSet<>();
+private final Set completedRestorers = new HashSet<>();
 private final Duration pollTime;
 
 public StoreChangelogReader(final Consumer restoreConsumer,
@@ -64,9 +65,14 @@ public StoreChangelogReader(final Consumer 
restoreConsumer,
 
 @Override
 public void register(final StateRestorer restorer) {
-restorer.setUserRestoreListener(userStateRestoreListener);
-stateRestorers.put(restorer.partition(), restorer);
-needsInitializing.put(restorer.partition(), restorer);
+if (!stateRestorers.containsKey(restorer.partition())) {
+restorer.setUserRestoreListener(userStateRestoreListener);
+stateRestorers.put(restorer.partition(), restorer);
+
+log.trace("Added restorer for changelog {}", restorer.partition());
+}
+
+needsInitializing.add(restorer.partition());
 }
 
 public Collection restore(final RestoringTasks active) {
@@ -81,16 +87,15 @@ public void register(final StateRestorer restorer) {
 
 try {
 final ConsumerRecords records = 
restoreConsumer.poll(pollTime);
-final Iterator iterator = 
needsRestoring.keySet().iterator();
-while (iterator.hasNext()) {
-final TopicPartition partition = iterator.next();
+
+for (final TopicPartition partition : needsRestoring) {
 final StateRestorer restorer = stateRestorers.get(partition);
 final long pos = processNext(records.records(partition), 
restorer, endOffsets.get(partition));
 restorer.setRestoredOffset(pos);
 if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
 restorer.restoreDone();
 endOffsets.remove(partition);
-iterator.remove(

[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-09-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16611418#comment-16611418
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax opened a new pull request #5641: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5641
 
 
   Unified backport of PRs #5421 and #5430


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615257#comment-16615257
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax closed pull request #5641: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5641
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 04af9f269b7..c094b838a1c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -193,4 +193,10 @@ public ThreadCache getCache() {
 public void initialized() {
 initialized = true;
 }
+
+@Override
+public void uninitialize() {
+initialized = false;
+}
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f6ac7ca614..f8f6416b3e5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,10 @@ void initStateStores() {
 }
 }
 
+void reinitializeStateStoresForPartitions(final TopicPartition partitions) 
{
+stateMgr.reinitializeStateStoresForPartitions(partitions, 
processorContext);
+}
+
 /**
  * @throws ProcessorStateException if there is an error while closing the 
state manager
  * @param writeCheckpoint boolean indicating if a checkpoint file should 
be written
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index d59ec2b2c1b..ad4868f8fce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -298,7 +298,7 @@ private void describe(final StringBuilder builder,
 return suspended.values();
 }
 
-Collection restoringTasks() {
+public Collection restoringTasks() {
 return Collections.unmodifiableCollection(restoring.values());
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 5ebc34c4e92..e82ee2c354f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -37,7 +37,7 @@
  * Restore all registered state stores by reading from their changelogs.
  * @return all topic partitions that have been restored
  */
-Collection restore();
+Collection restore(final Collection 
restoringTasks);
 
 /**
  * @return the restored offsets for all persistent stores.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac81a6..b5719b111f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@
  * Mark this contex as being initialized
  */
 void initialized();
+
+/**
+ * Mark this context as being uninitialized
+ */
+void uninitialize();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 34a87ce03be..93e7ffc5a06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.LockException;
 import org.

[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615537#comment-16615537
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax opened a new pull request #5657: KAFKA-7192: Wipe out state store if EOS 
is turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5657
 
 
   Unified backport of PRs #5421 and #5430
   
   Also compare backport to `0.11.0` #5641
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)