[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2018-01-16 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327992#comment-16327992
 ] 

Matthias J. Sax commented on KAFKA-6269:


Release plan for v1.0.1 was proposed today: 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+1.0.1

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> > 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2018-01-16 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327592#comment-16327592
 ] 

Matthias J. Sax commented on KAFKA-6269:


I would assume that branch `1.0` 
([https://github.com/apache/kafka/tree/1.0|https://github.com/apache/kafka/tree/1.0)])
 should be quite stable – it only contains bug fixes for `1.0.1`. As an 
alternative, you can also checkout `1.0.0` commit 
([https://github.com/apache/kafka/tree/1.0.0|https://github.com/apache/kafka/tree/1.0.0)])
 and cherry-pick the fix.

Btw: Brokers are always backward compatible, to yes, KS 0.11.0.1 will work with 
`1.0.0` brokers.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2018-01-16 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327439#comment-16327439
 ] 

Bill Bejeck commented on KAFKA-6269:


[~dminkovsky] , are you good now with the 1.1.0-SNAPSHOT build?

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> > topics: [entity-B-exists]
> > 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2018-01-16 Thread Dmitry Minkovsky (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327259#comment-16327259
 ] 

Dmitry Minkovsky commented on KAFKA-6269:
-

I'm being hit by this and wondering what to do.
 
* I upgraded my brokers to 1.0.0. Will Kafka Streams to 0.11.0.1 work with a 
1.0.0 broker?
 * I'm fine building from trunk. Could you recommend a "stable" commit for me 
to use?

With regard to a workaround for 1.0.0, I am confused about the null masking 
option, and am confused why groupByKey().aggregate() will work.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states:

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308416#comment-16308416
 ] 

ASF GitHub Bot commented on KAFKA-6269:
---

guozhangwang closed pull request #4300: KAFKA-6269: KTable restore fails after 
rebalance
URL: https://github.com/apache/kafka/pull/4300
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 10aedbb93c4..d9085b0697f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -197,7 +197,7 @@ public synchronized void addRecord(ConsumerRecord 
record) {
 throw new IllegalStateException("Cannot add records for a 
partition that is not assigned to the consumer");
 List> recs = this.records.get(tp);
 if (recs == null) {
-recs = new ArrayList>();
+recs = new ArrayList<>();
 this.records.put(tp, recs);
 }
 recs.add(record);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 178d2bb96d0..ba17ce95ede 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -96,7 +96,6 @@ public void register(final StateRestorer restorer) {
 restoreConsumer.seekToBeginning(partitions);
 }
 
-
 if (needsRestoring.isEmpty()) {
 restoreConsumer.unsubscribe();
 }
@@ -174,8 +173,8 @@ private void startRestoration(final Map initializ
 if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
 restoreConsumer.seek(restorer.partition(), 
restorer.checkpoint());
 logRestoreOffsets(restorer.partition(),
-restorer.checkpoint(),
-endOffsets.get(restorer.partition()));
+  restorer.checkpoint(),
+  endOffsets.get(restorer.partition()));
 
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
 restorer.restoreStarted();
 } else {
@@ -187,8 +186,8 @@ private void startRestoration(final Map initializ
 for (final StateRestorer restorer : needsPositionUpdate) {
 final long position = 
restoreConsumer.position(restorer.partition());
 logRestoreOffsets(restorer.partition(),
-position,
-endOffsets.get(restorer.partition()));
+  position,
+  endOffsets.get(restorer.partition()));
 restorer.setStartingOffset(position);
 restorer.restoreStarted();
 }
@@ -252,7 +251,7 @@ private void restorePartition(final ConsumerRecords allRecords,
 final long pos = processNext(allRecords.records(topicPartition), 
restorer, endOffset);
 restorer.setRestoredOffset(pos);
 if (restorer.hasCompleted(pos, endOffset)) {
-if (pos > endOffset + 1) {
+if (pos > endOffset) {
 throw new TaskMigratedException(task, topicPartition, 
endOffset, pos);
 }
 
@@ -271,30 +270,40 @@ private long processNext(final 
List> records,
  final StateRestorer restorer,
  final Long endOffset) {
 final List> restoreRecords = new 
ArrayList<>();
-long offset = -1;
-
+long nextPosition = -1;
+int numberRecords = records.size();
+int numberRestored = 0;
 for (final ConsumerRecord record : records) {
-offset = record.offset();
+final long offset = record.offset();
 if (restorer.hasCompleted(offset, endOffset)) {
+nextPosition = record.offset();
 break;
 }
+numberRestored++;
 if (record.key() != null) {
 restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
 }
 }
 
-if (offset == -1) {
-offset = 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16300940#comment-16300940
 ] 

ASF GitHub Bot commented on KAFKA-6269:
---

mjsax commented on a change in pull request #4300: KAFKA-6269: KTable restore 
fails after rebalance
URL: https://github.com/apache/kafka/pull/4300#discussion_r158426654
 
 

 ##
 File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 ##
 @@ -385,6 +379,130 @@ public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestore
 } catch (final TaskMigratedException expected) { /* ignore */ }
 }
 
+@Test
+public void 
shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled()
 {
+final int totalMessages = 10;
+assignPartition(totalMessages, topicPartition);
+// records 0..4
+addRecords(5, topicPartition, 0);
+//EOS enabled commit marker at offset 5 so rest of records 6..10
+addRecords(5, topicPartition, 6);
+consumer.assign(Collections.emptyList());
+
+// end offsets should start after commit marker of 5 from above
+consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
6L));
+changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+replay(active);
+try {
+changelogReader.restore(active);
+fail("Should have thrown task migrated exception");
+} catch (final TaskMigratedException expected) {
+/* ignore */
+}
+}
+
+@Test
+public void 
shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled()
 {
+final int totalMessages = 10;
+setupConsumer(totalMessages, topicPartition);
+// records have offsets of 0..9 10 is commit marker so 11 is end offset
+consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
11L));
+
+consumer.assign(Collections.emptyList());
+
+changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+replay(active);
+
+changelogReader.restore(active);
+assertThat(callback.restored.size(), equalTo(10));
+}
+
+
+@Test
+public void 
shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled()
 {
+final int totalMessages = 10;
+setupConsumer(totalMessages, topicPartition);
+
+consumer.assign(Collections.emptyList());
+
+changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+
+expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+replay(active);
+
+changelogReader.restore(active);
+assertThat(callback.restored.size(), equalTo(10));
+}
+
+@Test
+public void 
shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic()
 {
+final int messages = 10;
+setupConsumer(messages, topicPartition);
+consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 
5L));
 
 Review comment:
   I just thought about this. I think `endOffset` should be actual endOffset, 
ie, `11` for this test -- we pass in the `offsetLimit` as 5 in `StateRestorer` 
below.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
> 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-12-12 Thread Matthias J. Sax (JIRATEST)

[ 
https://issues-test.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265588#comment-16265588
 ] 

Matthias J. Sax commented on KAFKA-6269:


[~stevenschlansker] Thanks for you comment, but this is not the official Jira 
board. Please comment on [https://issues.apache.org/jira/browse/KAFKA-6269.] 
Thx.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues-test.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-12-11 Thread Steven Schlansker (JIRATEST)

[ 
https://issues-test.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265581#comment-16265581
 ] 

Steven Schlansker commented on KAFKA-6269:
--

We just hit a similar issue during a rollout where our application repeatedly 
refused to start, giving error messages like the ones above.  Looking forward 
to a fix in the next release.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues-test.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-12-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16280993#comment-16280993
 ] 

ASF GitHub Bot commented on KAFKA-6269:
---

GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/4300

KAFKA-6269[WIP] DO NOT MERGE attempts to recreate original error

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka 
KAFKA_6269_ktable_restore_fails_after_rebalance

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4300


commit 107a213af4f1106857bbc0b60dabb0bf5ecd3cc0
Author: Bill Bejeck 
Date:   2017-12-06T21:57:58Z

KAFKA-6269[WIP] attempts to recreate original error




> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Assignee: Bill Bejeck
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-30 Thread Andreas Schroeder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272972#comment-16272972
 ] 

Andreas Schroeder commented on KAFKA-6269:
--

[~guozhang] it's okay for my team to wait or the 1.0.1 release. Until then, 
we'll stick to the 0.11.0.1 version we are currently using. The reason to 
migrate to 1.0.0 was that we are experiencing some unfair task assignment 
across our stream processor nodes, which leads to some nodes crashing (and 
immediately being recreated). So our current system runs and we can wait for 
1.0.1 Thanks however for giving suggestions on how to proceed! I'll try 
[~mjsax]'s suggestion on hiding the null value :) 

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-29 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272013#comment-16272013
 ] 

Matthias J. Sax commented on KAFKA-6269:


About the workaround. You could mask {{null}} with a dummy value: 
`stream.mapValue(/*replace null with custom NULL*).groupByKey.aggregate()}}.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> >  

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-29 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16271399#comment-16271399
 ] 

Guozhang Wang commented on KAFKA-6269:
--

[~andreas-schroeder] We have spotted the regression bug introduced in 1.0.0 and 
we are working on a fix that would be included in the coming 1.0.1 release. But 
the bug-fix release may only come in early December, and I'm wondering how much 
this issue is blocking your team's progress. Would you be OK with compiling 
from {{1.0}} branch directly once we have the bug fix in?

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-28 Thread Andreas Schroeder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269058#comment-16269058
 ] 

Andreas Schroeder commented on KAFKA-6269:
--

Hi again [~mjsax], I'm back with some news (finally): The issue we are having 
is that Records with null value are ignored. So deletes won't propagate to the 
outer join, so that our business logic doesn't work any more.

See the [KGroupedStream API 
docs|http://apache.mirror.digionline.de/kafka/1.0.0/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Materialized)]

Any other ideas? :)

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> >   

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-27 Thread Andreas Schroeder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266491#comment-16266491
 ] 

Andreas Schroeder commented on KAFKA-6269:
--

Hi [~mjsax], thanks for the swift response and your workaround proposal. I 
tried to make that workaround work, but it breaks some of my integration tests. 
I'm still investigating my tests to determine if this workaround is feasible. 
I'll keep you posted.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> >   

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-26 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266321#comment-16266321
 ] 

Matthias J. Sax commented on KAFKA-6269:


In the PR linked below, we changed the {{StoreChangelogReader}}. The change in 
the PR was a fix for regular restore case if EOS is enabled. Because of commit 
markers in the topic, using the record offset as "end offset" to check if 
restore is finished is not save and might end up in infinite loop (can dig out 
the JIRA if you wish). Thus, we return {{restoreConsumer.position()}} instead. 
However, this is wrong for the source-KTable case when the guard fires 
(originally, we set {{offset = record.offset}} if guard fires, but remove this 
line). Thus, even if we stop restore correctly, we return the wrong offset that 
is larger then expected end-offset and thus we get the exception.

https://github.com/apache/kafka/commit/eaabb6cd0173c4f6854eb5da39194a7e3fc0162c#diff-46ed6d177221c8778965ecb1b6657be3R264

I was thinking about a potential fix. A straight forward fix would not work: 
returning eagerly if the guard condition is met, and return {{record.offset()}} 
would yield the EOS issue again. But I think, we could return the offset of the 
next record (the first one that we do not restore anymore), or 
{{restoreConsumer.position()}} if no next record exist (ie, if nobody actually 
added records to the source-KTable topic.

WDYT?

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-26 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266157#comment-16266157
 ] 

Guozhang Wang commented on KAFKA-6269:
--

[~mjsax] What bug have you found that relates to the guard not effective?

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> > topics: [entity-B-exists]
> > 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264926#comment-16264926
 ] 

Matthias J. Sax commented on KAFKA-6269:


Had a look into the code, and I think I understand the issue. The guard is not 
working as expected (what was my suspicion).

The only workaround I can think of atm, would be to read the table source 
topics as {{KStream}} and do a dummy aggregation that only keeps the latest 
value to transform the {{KStream}} into a {{KTable}} -- this will create a 
proper changelog topic and avoid hitting this issue.

For your special case, as you write the topic with {{to()}} you could also 
replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the 
{{entitiy-X-exists}} topics anymore. This should result in a no-overhead 
solution for you, as there will be no repartitioning topic as you only call a 
{{mapValues}} before the {{groupBy().aggregation()}}.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> >

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264921#comment-16264921
 ] 

Matthias J. Sax commented on KAFKA-6269:


Thanks for reporting this. If you get the exception {{Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388}}, this means that a thread started to recover a state 
from a changelog topic and the endOffset moved during the process -- this 
should never happen (only if the task was migrated to another thread and this 
other thread writes into the changelog topic). If the state is not migrated, 
the thread that restores would be the only one that is "allowed" to write into 
the changelog; but as long it restores it does not write.

What I could think of is a bug, that relates to an optimization we do: As you 
read KTables from source topics, there is no additional changelog topic as the 
source topic can be used to recreate the state (the error message shows that 
the source topic is used for store recovery). Thus, a upstream processor that 
write to the source topic can of course append data to this topic -- we 
actually have a guard for this case, but we change some code here, maybe 
introducing a bug that this guard does not work properly anymore: thus, the 
restoring thread "thinks" it reads a changelog topic (while it does read a 
source topic) for recovery and fails even if it shouldn't. I'll try to 
reproduce this locally and cycle back to you.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread