[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-30 Thread Andreas Schroeder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16272972#comment-16272972
 ] 

Andreas Schroeder commented on KAFKA-6269:
--

[~guozhang] it's okay for my team to wait or the 1.0.1 release. Until then, 
we'll stick to the 0.11.0.1 version we are currently using. The reason to 
migrate to 1.0.0 was that we are experiencing some unfair task assignment 
across our stream processor nodes, which leads to some nodes crashing (and 
immediately being recreated). So our current system runs and we can wait for 
1.0.1 Thanks however for giving suggestions on how to proceed! I'll try 
[~mjsax]'s suggestion on hiding the null value :) 

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-28 Thread Andreas Schroeder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16269058#comment-16269058
 ] 

Andreas Schroeder commented on KAFKA-6269:
--

Hi again [~mjsax], I'm back with some news (finally): The issue we are having 
is that Records with null value are ignored. So deletes won't propagate to the 
outer join, so that our business logic doesn't work any more.

See the [KGroupedStream API 
docs|http://apache.mirror.digionline.de/kafka/1.0.0/javadoc/org/apache/kafka/streams/kstream/KGroupedStream.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Materialized)]

Any other ideas? :)

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> >   

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-27 Thread Andreas Schroeder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266491#comment-16266491
 ] 

Andreas Schroeder commented on KAFKA-6269:
--

Hi [~mjsax], thanks for the swift response and your workaround proposal. I 
tried to make that workaround work, but it breaks some of my integration tests. 
I'm still investigating my tests to determine if this workaround is feasible. 
I'll keep you posted.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> >   

[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Andreas Schroeder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas Schroeder updated KAFKA-6269:
-
Description: 
I have the following kafka streams topology:

entity-B -> map step -> entity-B-exists (with state store)
entity-A   -> map step -> entity-A-exists (with state store)

(entity-B-exists, entity-A-exists) -> outer join with state store.

The topology building code looks like this (some data type, serde, valuemapper, 
and joiner code omitted):

{code}
def buildTable[V](builder: StreamsBuilder,
  sourceTopic: String,
  existsTopic: String,
  valueSerde: Serde[V],
  valueMapper: ValueMapper[String, V]): KTable[String, 
V] = {

  val stream: KStream[String, String] = builder.stream[String, 
String](sourceTopic)
  val transformed: KStream[String, V] = stream.mapValues(valueMapper)
  transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))

  val inMemoryStoreName = s"$existsTopic-persisted"

  val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
  .withKeySerde(Serdes.String())
  .withValueSerde(valueSerde)
  .withLoggingDisabled()

  builder.table(existsTopic, materialized)
}


val builder = new StreamsBuilder
val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
(value != null) "" else null

val entitiesB: KTable[String, EntityBInfo] =
  buildTable(builder,
 "entity-B",
 "entity-B-exists",
 EntityBInfoSerde,
 ListingImagesToEntityBInfo)

val entitiesA: KTable[String, String] =
  buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
mapToEmptyString)

val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
EntityDiff.fromJoin(a, b)

val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
  .withKeySerde(Serdes.String())
  .withValueSerde(EntityDiffSerde)
  .withLoggingEnabled(new java.util.HashMap[String, String]())

val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, 
materialized)
{code}

We run 4 processor machines with 30 stream threads each; each topic has 30 
partitions so that there is a total of 4 x 30 = 120 partitions to consume. The 
initial launch of the processor works fine, but when killing one processor and 
letting him re-join the stream threads leads to some faulty behaviour.

Fist, the total number of assigned partitions over all processor machines is 
larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
assignment seems to assign the same job to different stream threads.

The processor machines trying to re-join the consumer group fail constantly 
with the error message of 'Detected a task that got migrated to another 
thread.' We gave the processor half an hour to recover; usually, rebuilding the 
KTable states take around 20 seconds (with Kafka 0.11.0.1).

Here are the details of the errors we see:

stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
migrated to another thread. This implies that this thread missed a rebalance 
and dropped out of the consumer group. Trying to rejoin the consumer group now.

{code}
org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388
> StreamsTask taskId: 1_0
> > ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: [entity-A-exists]
>   children:   [KTABLE-SOURCE-09]
>   KTABLE-SOURCE-09:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-JOINTHIS-11]
>   KTABLE-JOINTHIS-11:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
>   KSTREAM-SOURCE-03:
>   topics: [entity-B-exists]
>   children:   [KTABLE-SOURCE-04]
>   KTABLE-SOURCE-04:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-JOINOTHER-12]
>   KTABLE-JOINOTHER-12:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
> Partitions [entity-A-exists-0, entity-B-exists-0]

at 

[jira] [Created] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Andreas Schroeder (JIRA)
Andreas Schroeder created KAFKA-6269:


 Summary: KTable state restore fails after rebalance
 Key: KAFKA-6269
 URL: https://issues.apache.org/jira/browse/KAFKA-6269
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Andreas Schroeder


I have the following kafka streams topology:

entity-B -> map step -> entity-B-exists (with state store)
entity-A   -> map step -> entity-A-exists (with state store)

(entity-B-exists, entity-A-exists) -> outer join with state store.

The topology building code looks like this (some data type, serde, valuemapper, 
and joiner code omitted):

def buildTable[V](builder: StreamsBuilder,
  sourceTopic: String,
  existsTopic: String,
  valueSerde: Serde[V],
  valueMapper: ValueMapper[String, V]): KTable[String, 
V] = {

  val stream: KStream[String, String] = builder.stream[String, 
String](sourceTopic)
  val transformed: KStream[String, V] = stream.mapValues(valueMapper)
  transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))

  val inMemoryStoreName = s"$existsTopic-persisted"

  val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
  .withKeySerde(Serdes.String())
  .withValueSerde(valueSerde)
  .withLoggingDisabled()

  builder.table(existsTopic, materialized)
}


val builder = new StreamsBuilder
val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
(value != null) "" else null

val entitiesB: KTable[String, EntityBInfo] =
  buildTable(builder,
 "entity-B",
 "entity-B-exists",
 EntityBInfoSerde,
 ListingImagesToEntityBInfo)

val entitiesA: KTable[String, String] =
  buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
mapToEmptyString)

val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
EntityDiff.fromJoin(a, b)

val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
  .withKeySerde(Serdes.String())
  .withValueSerde(EntityDiffSerde)
  .withLoggingEnabled(new java.util.HashMap[String, String]())

val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, 
materialized)


We run 4 processor machines with 30 stream threads each; each topic has 30 
partitions
so that there is a total of 4 x 30 = 120 partitions to consume. The initial 
launch
of the processor works fine, but when killing one processor and letting
him re-join the stream threads leads to some faulty behaviour.

Fist, the total number of assigned partitions over all processor machines is
larger than 120 (sometimes 157, sometimes just 132), so the partition / task
assignment seems to assign the same job to different stream threads.

The processor machines trying to re-join the consumer group fail constantly 
with the error message of 'Detected a task that got migrated to another thread.'
We gave the processor half an hour to recover; usually, rebuilding the 
KTable states take around 20 seconds (with Kafka 0.11.0.1).

Here are the details of the errors we see:

stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
migrated to another thread. This implies that this thread missed a rebalance 
and dropped out of the consumer group. Trying to rejoin the consumer group now.

org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388
> StreamsTask taskId: 1_0
> > ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: [entity-A-exists]
>   children:   [KTABLE-SOURCE-09]
>   KTABLE-SOURCE-09:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-JOINTHIS-11]
>   KTABLE-JOINTHIS-11:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
>   KSTREAM-SOURCE-03:
>   topics: [entity-B-exists]
>   children:   [KTABLE-SOURCE-04]
>   KTABLE-SOURCE-04:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-JOINOTHER-12]
>   KTABLE-JOINOTHER-12:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: