[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265006#comment-16265006
 ] 

Silvio Papa commented on KAFKA-6267:


Thanks for reply Ismael, 

After change of the broker the issue seems resolved in linux.
Do you have a suggestion for mauro issue?

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2017-11-23 Thread Philippe Laflamme (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264945#comment-16264945
 ] 

Philippe Laflamme commented on KAFKA-5413:
--

We're hitting the same problem and would like to upgrade to a 0.10.x version to 
reduce the workload and, more importantly, risks. According to 
https://issues.apache.org/jira/projects/KAFKA/versions/12340570 there are 19 
bugs total, 2 of which are marked critical and 14 major.

As far as I can tell, there is no workaround for this problem. Is this enough 
reason to go through a release cycle? I'd like to provide some help, but 
obviously I can't do that for releasing. What are things that users like us can 
do to make a 0.10.2.2 release happen?

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 

[jira] [Comment Edited] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264926#comment-16264926
 ] 

Matthias J. Sax edited comment on KAFKA-6269 at 11/24/17 5:14 AM:
--

Had a look into the code, and I think I understand the issue. The guard is not 
working as expected (what was my suspicion).

The only workaround I can think of atm, would be to read the table source 
topics as {{KStream}} and do a dummy aggregation that only keeps the latest 
value to transform the {{KStream}} into a {{KTable}} -- this will create a 
proper changelog topic and avoid hitting this issue.

For your special case, as you write the topic with {{to()}} you could also 
replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the 
{{entitiy-X-exists}} topics anymore. This should result in a no-overhead 
solution for you, as there will be no repartitioning topic as you only call a 
{{mapValues}} before the {{groupBy().aggregation()}}.

It would be great if you could confirm if this workaround works. Thx.


was (Author: mjsax):
Had a look into the code, and I think I understand the issue. The guard is not 
working as expected (what was my suspicion).

The only workaround I can think of atm, would be to read the table source 
topics as {{KStream}} and do a dummy aggregation that only keeps the latest 
value to transform the {{KStream}} into a {{KTable}} -- this will create a 
proper changelog topic and avoid hitting this issue.

For your special case, as you write the topic with {{to()}} you could also 
replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the 
{{entitiy-X-exists}} topics anymore. This should result in a no-overhead 
solution for you, as there will be no repartitioning topic as you only call a 
{{mapValues}} before the {{groupBy().aggregation()}}.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264926#comment-16264926
 ] 

Matthias J. Sax commented on KAFKA-6269:


Had a look into the code, and I think I understand the issue. The guard is not 
working as expected (what was my suspicion).

The only workaround I can think of atm, would be to read the table source 
topics as {{KStream}} and do a dummy aggregation that only keeps the latest 
value to transform the {{KStream}} into a {{KTable}} -- this will create a 
proper changelog topic and avoid hitting this issue.

For your special case, as you write the topic with {{to()}} you could also 
replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the 
{{entitiy-X-exists}} topics anymore. This should result in a no-overhead 
solution for you, as there will be no repartitioning topic as you only call a 
{{mapValues}} before the {{groupBy().aggregation()}}.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> >

[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6269:
---
Priority: Blocker  (was: Major)

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> > topics: [entity-B-exists]
> > children:   [KTABLE-SOURCE-04]
> > 

[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6269:
---
Fix Version/s: 1.0.1
   1.1.0

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
>Priority: Blocker
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
> migrated to another thread. This implies that this thread missed a rebalance 
> and dropped out of the consumer group. Trying to rejoin the consumer group 
> now.
> {code}
> org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
> entity-B-exists-0 should not change while restoring: old end offset 4750539, 
> current offset 4751388
> > StreamsTask taskId: 1_0
> > >   ProcessorTopology:
> > KSTREAM-SOURCE-08:
> > topics: [entity-A-exists]
> > children:   [KTABLE-SOURCE-09]
> > KTABLE-SOURCE-09:
> > states: [entity-A-exists-persisted]
> > children:   [KTABLE-JOINTHIS-11]
> > KTABLE-JOINTHIS-11:
> > states: [entity-B-exists-persisted]
> > children:   [KTABLE-MERGE-10]
> > KTABLE-MERGE-10:
> > states: [entity-A-joined-with-entity-B]
> > KSTREAM-SOURCE-03:
> > topics: [entity-B-exists]
> > children:   [KTABLE-SOURCE-04]
> > 

[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264921#comment-16264921
 ] 

Matthias J. Sax commented on KAFKA-6269:


Thanks for reporting this. If you get the exception {{Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388}}, this means that a thread started to recover a state 
from a changelog topic and the endOffset moved during the process -- this 
should never happen (only if the task was migrated to another thread and this 
other thread writes into the changelog topic). If the state is not migrated, 
the thread that restores would be the only one that is "allowed" to write into 
the changelog; but as long it restores it does not write.

What I could think of is a bug, that relates to an optimization we do: As you 
read KTables from source topics, there is no additional changelog topic as the 
source topic can be used to recreate the state (the error message shows that 
the source topic is used for store recovery). Thus, a upstream processor that 
write to the source topic can of course append data to this topic -- we 
actually have a guard for this case, but we change some code here, maybe 
introducing a bug that this guard does not work properly anymore: thus, the 
restoring thread "thinks" it reads a changelog topic (while it does read a 
source topic) for recovery and fails even if it shouldn't. I'll try to 
reproduce this locally and cycle back to you.

> KTable state restore fails after rebalance
> --
>
> Key: KAFKA-6269
> URL: https://issues.apache.org/jira/browse/KAFKA-6269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andreas Schroeder
> Fix For: 1.1.0, 1.0.1
>
>
> I have the following kafka streams topology:
> entity-B -> map step -> entity-B-exists (with state store)
> entity-A   -> map step -> entity-A-exists (with state store)
> (entity-B-exists, entity-A-exists) -> outer join with state store.
> The topology building code looks like this (some data type, serde, 
> valuemapper, and joiner code omitted):
> {code}
> def buildTable[V](builder: StreamsBuilder,
>   sourceTopic: String,
>   existsTopic: String,
>   valueSerde: Serde[V],
>   valueMapper: ValueMapper[String, V]): 
> KTable[String, V] = {
>   val stream: KStream[String, String] = builder.stream[String, 
> String](sourceTopic)
>   val transformed: KStream[String, V] = stream.mapValues(valueMapper)
>   transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))
>   val inMemoryStoreName = s"$existsTopic-persisted"
>   val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(valueSerde)
>   .withLoggingDisabled()
>   builder.table(existsTopic, materialized)
> }
> val builder = new StreamsBuilder
> val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
> (value != null) "" else null
> val entitiesB: KTable[String, EntityBInfo] =
>   buildTable(builder,
>  "entity-B",
>  "entity-B-exists",
>  EntityBInfoSerde,
>  ListingImagesToEntityBInfo)
> val entitiesA: KTable[String, String] =
>   buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
> mapToEmptyString)
> val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
> EntityDiff.fromJoin(a, b)
> val materialized = 
> Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
>   .withKeySerde(Serdes.String())
>   .withValueSerde(EntityDiffSerde)
>   .withLoggingEnabled(new java.util.HashMap[String, String]())
> val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, 
> joiner, materialized)
> {code}
> We run 4 processor machines with 30 stream threads each; each topic has 30 
> partitions so that there is a total of 4 x 30 = 120 partitions to consume. 
> The initial launch of the processor works fine, but when killing one 
> processor and letting him re-join the stream threads leads to some faulty 
> behaviour.
> Fist, the total number of assigned partitions over all processor machines is 
> larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
> assignment seems to assign the same job to different stream threads.
> The processor machines trying to re-join the consumer group fail constantly 
> with the error message of 'Detected a task that got migrated to another 
> thread.' We gave the processor half an hour to recover; usually, rebuilding 
> the KTable states take around 20 seconds (with Kafka 0.11.0.1).
> Here are the details of the errors we see:
> stream-thread 

[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264806#comment-16264806
 ] 

Matthias J. Sax commented on KAFKA-6248:


It is compatible: 
https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility

If this does what you need, would you mind close this as "no a problem"?

> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval

2017-11-23 Thread Michal Klempa (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264728#comment-16264728
 ] 

Michal Klempa commented on KAFKA-6266:
--

I am experiencing this with Kafka 0.10.2.1 (confluent platform), CentOS6

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
> I'm seeing these continuously in the log, and want these to be fixed- so that 
> they wont repeat. Can someone please help me in fixing the below warnings.
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval

2017-11-23 Thread VinayKumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264653#comment-16264653
 ] 

VinayKumar commented on KAFKA-6266:
---

Thank you for the reply.
The errors don't go away, they are continuously seen repeating in the log ever.

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
> I'm seeing these continuously in the log, and want these to be fixed- so that 
> they wont repeat. Can someone please help me in fixing the below warnings.
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5647) Use async ZookeeperClient for Admin operations

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264621#comment-16264621
 ] 

ASF GitHub Bot commented on KAFKA-5647:
---

GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/4260

KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and 
PreferredReplicaLeaderElectionCommand

*  Use KafkaZkClient in ReassignPartitionsCommand
*  Use KafkaZkClient in PreferredReplicaLeaderElectionCommand
*  Updated test classes to use new methods
*  All existing tests should pass

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-5647-ADMINCOMMANDS

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4260.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4260


commit b0dcf9fde0754f17e576186c47889b621b3b9769
Author: Manikumar Reddy 
Date:   2017-11-17T15:38:52Z

KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and 
PreferredReplicaLeaderElectionCommand




> Use async ZookeeperClient for Admin operations
> --
>
> Key: KAFKA-5647
> URL: https://issues.apache.org/jira/browse/KAFKA-5647
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Manikumar
> Fix For: 1.1.0
>
>
> Since we will be removing the ZK dependency in most of the admin clients, we 
> only need to change the admin operations used on the server side. This 
> includes converting AdminManager and the remaining usage of zkUtils in 
> KafkaApis to use ZookeeperClient/KafkaZkClient. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Mauro Giacometti (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561
 ] 

Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:46 PM:
---

Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on the db. A final commit on the db is performed immediatly followed 
by a commit of the Kafka Transaction. Every rollback in the JPA transaction 
will lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 
manipulating/serializing/deserializing the messages we're publishing on the 
Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i 
don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction 
is required but there's the old one up. So the abortTransaction method is 
invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
  EntityManagerFactory emf,
  DefaultKafkaProducerFactory producerFactory) {

final JpaTransactionManager jpaTransactionManager = new 
JpaTransactionManager();
jpaTransactionManager.setEntityManagerFactory(emf);
final KafkaTransactionManager kafkaTransactionManager = new 
KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
kafkaTransactionManager.setNestedTransactionAllowed(true);
kafkaTransactionManager.setValidateExistingTransaction(true);
kafkaTransactionManager.setRollbackOnCommitFailure(true);

kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { 
jpaTransactionManager, kafkaTransactionManager });

  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {

final Map configMap = new HashMap<>();
configMap.put("bootstrap.servers", bootstrapServers);
configMap.put("key.serializer", keySerializer);
configMap.put("value.serializer", valueSerializer);
configMap.put("schema.registry.url", schemaRegistryEndpoint);
configMap.put("enable.idempotence", enableIdempotenceConfig );
configMap.put("transactional.id", transactionalIdConfig);
configMap.put("transaction.timeout.ms", transactionTimeoutMs);

final DefaultKafkaProducerFactory producerFactory = new 
DefaultKafkaProducerFactory(configMap);

producerFactory.setTransactionIdPrefix(transactionalIdConfig);
return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory 
producerFactory, RecordMessageConverter messageConverter) {

KafkaTemplate template = new KafkaTemplate<>(producerFactory);
template.setMessageConverter(messageConverter);
return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.0
kavka-avro-serializer is 3.3.0

EDIT//
spring-kafka is 1.3.0


was (Author: mauro.giacometti):
Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on the db. A final commit on the db is performed immediatly followed 
by a commit of the Kafka Transaction. Every rollback in the JPA transaction 
will lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 

[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6267:
---
Priority: Major  (was: Blocker)

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264580#comment-16264580
 ] 

Ismael Juma commented on KAFKA-6267:


The broker version is older than 0.11.0.0, so transactions won't work. I can 
from the stacktraces that classes that no longer exist in 0.11.0.0 are being 
used.

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264580#comment-16264580
 ] 

Ismael Juma edited comment on KAFKA-6267 at 11/23/17 4:43 PM:
--

The broker version is older than 0.11.0.0, so transactions won't work. I can 
see from the stacktraces that classes that no longer exist in 0.11.0.0 are 
being used.


was (Author: ijuma):
The broker version is older than 0.11.0.0, so transactions won't work. I can 
from the stacktraces that classes that no longer exist in 0.11.0.0 are being 
used.

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Silvio Papa updated KAFKA-6267:
---
Component/s: clients

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264578#comment-16264578
 ] 

Silvio Papa commented on KAFKA-6267:


Done, i added server.log and controller.log

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Silvio Papa updated KAFKA-6267:
---
Attachment: server.log

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Silvio Papa updated KAFKA-6267:
---
Attachment: controller.log

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: controller.log, producer.JPG, server.log
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Mauro Giacometti (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561
 ] 

Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:31 PM:
---

Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on the db. A final commit on the db is performed immediatly followed 
by a commit of the Kafka Transaction. Every rollback in the JPA transaction 
will lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 
manipulating/serializing/deserializing the messages we're publishing on the 
Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i 
don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction 
is required but there's the old one up. So the abortTransaction method is 
invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
  EntityManagerFactory emf,
  DefaultKafkaProducerFactory producerFactory) {

final JpaTransactionManager jpaTransactionManager = new 
JpaTransactionManager();
jpaTransactionManager.setEntityManagerFactory(emf);
final KafkaTransactionManager kafkaTransactionManager = new 
KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
kafkaTransactionManager.setNestedTransactionAllowed(true);
kafkaTransactionManager.setValidateExistingTransaction(true);
kafkaTransactionManager.setRollbackOnCommitFailure(true);

kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { 
jpaTransactionManager, kafkaTransactionManager });

  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {

final Map configMap = new HashMap<>();
configMap.put("bootstrap.servers", bootstrapServers);
configMap.put("key.serializer", keySerializer);
configMap.put("value.serializer", valueSerializer);
configMap.put("schema.registry.url", schemaRegistryEndpoint);
configMap.put("enable.idempotence", enableIdempotenceConfig );
configMap.put("transactional.id", transactionalIdConfig);
configMap.put("transaction.timeout.ms", transactionTimeoutMs);

final DefaultKafkaProducerFactory producerFactory = new 
DefaultKafkaProducerFactory(configMap);

producerFactory.setTransactionIdPrefix(transactionalIdConfig);
return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory 
producerFactory, RecordMessageConverter messageConverter) {

KafkaTemplate template = new KafkaTemplate<>(producerFactory);
template.setMessageConverter(messageConverter);
return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.1
kavka-avro-serializer is 3.3.0

EDIT//
spring-kafka is 1.3.0


was (Author: mauro.giacometti):
Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on the db. A final commit on the db is performed immediatly followed 
by a commit of the Kafka Transaction. Every rollback in the JPA transaction 
will lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 

[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Mauro Giacometti (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561
 ] 

Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:28 PM:
---

Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on the db. A final commit on the db is performed immediatly followed 
by a commit of the Kafka Transaction. Every rollback in the JPA transaction 
will lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 
manipulating/serializing/deserializing the messages we're publishing on the 
Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i 
don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction 
is required but there's the old one up. So the abortTransaction method is 
invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
  EntityManagerFactory emf,
  DefaultKafkaProducerFactory producerFactory) {

final JpaTransactionManager jpaTransactionManager = new 
JpaTransactionManager();
jpaTransactionManager.setEntityManagerFactory(emf);
final KafkaTransactionManager kafkaTransactionManager = new 
KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
kafkaTransactionManager.setNestedTransactionAllowed(true);
kafkaTransactionManager.setValidateExistingTransaction(true);
kafkaTransactionManager.setRollbackOnCommitFailure(true);

kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { 
jpaTransactionManager, kafkaTransactionManager });

  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {

final Map configMap = new HashMap<>();
configMap.put("bootstrap.servers", bootstrapServers);
configMap.put("key.serializer", keySerializer);
configMap.put("value.serializer", valueSerializer);
configMap.put("schema.registry.url", schemaRegistryEndpoint);
configMap.put("enable.idempotence", enableIdempotenceConfig );
configMap.put("transactional.id", transactionalIdConfig);
configMap.put("transaction.timeout.ms", transactionTimeoutMs);

final DefaultKafkaProducerFactory producerFactory = new 
DefaultKafkaProducerFactory(configMap);

producerFactory.setTransactionIdPrefix(transactionalIdConfig);
return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory 
producerFactory, RecordMessageConverter messageConverter) {

KafkaTemplate template = new KafkaTemplate<>(producerFactory);
template.setMessageConverter(messageConverter);
return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.1
kavka-avro-serializer is 3.3.0


was (Author: mauro.giacometti):
Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on a db. A final commit on the db is performed immediatly followed by 
a commit of the Kafka Transaction. Every rollback in the JPA transaction will 
lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 
manipulating/serializing/deserializing the messages we're 

[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264568#comment-16264568
 ] 

Ismael Juma commented on KAFKA-6267:


Please include full broker and client logs. The error you pasted seems to 
indicate a versioning issue.

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Mauro Giacometti (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561
 ] 

Mauro Giacometti commented on KAFKA-6267:
-

Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on a db. A final commit on the db is performed immediatly followed by 
a commit of the Kafka Transaction. Every rollback in the JPA transaction will 
lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 
manipulating/serializing/deserializing the messages we're publishing on the 
Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i 
don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction 
is required but there's the old one up. So the abortTransaction method is 
invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
  EntityManagerFactory emf,
  DefaultKafkaProducerFactory producerFactory) {

final JpaTransactionManager jpaTransactionManager = new 
JpaTransactionManager();
jpaTransactionManager.setEntityManagerFactory(emf);
final KafkaTransactionManager kafkaTransactionManager = new 
KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
kafkaTransactionManager.setNestedTransactionAllowed(true);
kafkaTransactionManager.setValidateExistingTransaction(true);
kafkaTransactionManager.setRollbackOnCommitFailure(true);

kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { 
jpaTransactionManager, kafkaTransactionManager });

  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {

final Map configMap = new HashMap<>();
configMap.put("bootstrap.servers", bootstrapServers);
configMap.put("key.serializer", keySerializer);
configMap.put("value.serializer", valueSerializer);
configMap.put("schema.registry.url", schemaRegistryEndpoint);
configMap.put("enable.idempotence", enableIdempotenceConfig );
configMap.put("transactional.id", transactionalIdConfig);
configMap.put("transaction.timeout.ms", transactionTimeoutMs);

final DefaultKafkaProducerFactory producerFactory = new 
DefaultKafkaProducerFactory(configMap);

producerFactory.setTransactionIdPrefix(transactionalIdConfig);
return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory 
producerFactory, RecordMessageConverter messageConverter) {

KafkaTemplate template = new KafkaTemplate<>(producerFactory);
template.setMessageConverter(messageConverter);
return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.1
kavka-avro-serializer is 3.3.0

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264550#comment-16264550
 ] 

Silvio Papa edited comment on KAFKA-6267 at 11/23/17 4:18 PM:
--

In lubuntu obtain the log:

ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18

and topis obviously isn't created.


was (Author: papas):
In lubuntu obtain the log:

ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18

and topis isn't created.

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264550#comment-16264550
 ] 

Silvio Papa commented on KAFKA-6267:


In lubuntu obtain the log:

ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18

and topis isn't created.

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Andreas Schroeder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andreas Schroeder updated KAFKA-6269:
-
Description: 
I have the following kafka streams topology:

entity-B -> map step -> entity-B-exists (with state store)
entity-A   -> map step -> entity-A-exists (with state store)

(entity-B-exists, entity-A-exists) -> outer join with state store.

The topology building code looks like this (some data type, serde, valuemapper, 
and joiner code omitted):

{code}
def buildTable[V](builder: StreamsBuilder,
  sourceTopic: String,
  existsTopic: String,
  valueSerde: Serde[V],
  valueMapper: ValueMapper[String, V]): KTable[String, 
V] = {

  val stream: KStream[String, String] = builder.stream[String, 
String](sourceTopic)
  val transformed: KStream[String, V] = stream.mapValues(valueMapper)
  transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))

  val inMemoryStoreName = s"$existsTopic-persisted"

  val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
  .withKeySerde(Serdes.String())
  .withValueSerde(valueSerde)
  .withLoggingDisabled()

  builder.table(existsTopic, materialized)
}


val builder = new StreamsBuilder
val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
(value != null) "" else null

val entitiesB: KTable[String, EntityBInfo] =
  buildTable(builder,
 "entity-B",
 "entity-B-exists",
 EntityBInfoSerde,
 ListingImagesToEntityBInfo)

val entitiesA: KTable[String, String] =
  buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
mapToEmptyString)

val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
EntityDiff.fromJoin(a, b)

val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
  .withKeySerde(Serdes.String())
  .withValueSerde(EntityDiffSerde)
  .withLoggingEnabled(new java.util.HashMap[String, String]())

val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, 
materialized)
{code}

We run 4 processor machines with 30 stream threads each; each topic has 30 
partitions so that there is a total of 4 x 30 = 120 partitions to consume. The 
initial launch of the processor works fine, but when killing one processor and 
letting him re-join the stream threads leads to some faulty behaviour.

Fist, the total number of assigned partitions over all processor machines is 
larger than 120 (sometimes 157, sometimes just 132), so the partition / task 
assignment seems to assign the same job to different stream threads.

The processor machines trying to re-join the consumer group fail constantly 
with the error message of 'Detected a task that got migrated to another 
thread.' We gave the processor half an hour to recover; usually, rebuilding the 
KTable states take around 20 seconds (with Kafka 0.11.0.1).

Here are the details of the errors we see:

stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
migrated to another thread. This implies that this thread missed a rebalance 
and dropped out of the consumer group. Trying to rejoin the consumer group now.

{code}
org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388
> StreamsTask taskId: 1_0
> > ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: [entity-A-exists]
>   children:   [KTABLE-SOURCE-09]
>   KTABLE-SOURCE-09:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-JOINTHIS-11]
>   KTABLE-JOINTHIS-11:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
>   KSTREAM-SOURCE-03:
>   topics: [entity-B-exists]
>   children:   [KTABLE-SOURCE-04]
>   KTABLE-SOURCE-04:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-JOINOTHER-12]
>   KTABLE-JOINOTHER-12:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
> Partitions [entity-A-exists-0, entity-B-exists-0]

at 

[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264500#comment-16264500
 ] 

Ismael Juma commented on KAFKA-6267:


There are known issues affecting Windows. Are you saying that you've seen the 
same issue when running on Linux?

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6269) KTable state restore fails after rebalance

2017-11-23 Thread Andreas Schroeder (JIRA)
Andreas Schroeder created KAFKA-6269:


 Summary: KTable state restore fails after rebalance
 Key: KAFKA-6269
 URL: https://issues.apache.org/jira/browse/KAFKA-6269
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Andreas Schroeder


I have the following kafka streams topology:

entity-B -> map step -> entity-B-exists (with state store)
entity-A   -> map step -> entity-A-exists (with state store)

(entity-B-exists, entity-A-exists) -> outer join with state store.

The topology building code looks like this (some data type, serde, valuemapper, 
and joiner code omitted):

def buildTable[V](builder: StreamsBuilder,
  sourceTopic: String,
  existsTopic: String,
  valueSerde: Serde[V],
  valueMapper: ValueMapper[String, V]): KTable[String, 
V] = {

  val stream: KStream[String, String] = builder.stream[String, 
String](sourceTopic)
  val transformed: KStream[String, V] = stream.mapValues(valueMapper)
  transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde))

  val inMemoryStoreName = s"$existsTopic-persisted"

  val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName))
  .withKeySerde(Serdes.String())
  .withValueSerde(valueSerde)
  .withLoggingDisabled()

  builder.table(existsTopic, materialized)
}


val builder = new StreamsBuilder
val mapToEmptyString: ValueMapper[String, String] = (value: String) => if 
(value != null) "" else null

val entitiesB: KTable[String, EntityBInfo] =
  buildTable(builder,
 "entity-B",
 "entity-B-exists",
 EntityBInfoSerde,
 ListingImagesToEntityBInfo)

val entitiesA: KTable[String, String] =
  buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), 
mapToEmptyString)

val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => 
EntityDiff.fromJoin(a, b)

val materialized = 
Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B"))
  .withKeySerde(Serdes.String())
  .withValueSerde(EntityDiffSerde)
  .withLoggingEnabled(new java.util.HashMap[String, String]())

val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, 
materialized)


We run 4 processor machines with 30 stream threads each; each topic has 30 
partitions
so that there is a total of 4 x 30 = 120 partitions to consume. The initial 
launch
of the processor works fine, but when killing one processor and letting
him re-join the stream threads leads to some faulty behaviour.

Fist, the total number of assigned partitions over all processor machines is
larger than 120 (sometimes 157, sometimes just 132), so the partition / task
assignment seems to assign the same job to different stream threads.

The processor machines trying to re-join the consumer group fail constantly 
with the error message of 'Detected a task that got migrated to another thread.'
We gave the processor half an hour to recover; usually, rebuilding the 
KTable states take around 20 seconds (with Kafka 0.11.0.1).

Here are the details of the errors we see:

stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got 
migrated to another thread. This implies that this thread missed a rebalance 
and dropped out of the consumer group. Trying to rejoin the consumer group now.

org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
entity-B-exists-0 should not change while restoring: old end offset 4750539, 
current offset 4751388
> StreamsTask taskId: 1_0
> > ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: [entity-A-exists]
>   children:   [KTABLE-SOURCE-09]
>   KTABLE-SOURCE-09:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-JOINTHIS-11]
>   KTABLE-JOINTHIS-11:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: [entity-A-joined-with-entity-B]
>   KSTREAM-SOURCE-03:
>   topics: [entity-B-exists]
>   children:   [KTABLE-SOURCE-04]
>   KTABLE-SOURCE-04:
>   states: [entity-B-exists-persisted]
>   children:   [KTABLE-JOINOTHER-12]
>   KTABLE-JOINOTHER-12:
>   states: [entity-A-exists-persisted]
>   children:   [KTABLE-MERGE-10]
>   KTABLE-MERGE-10:
>   states: 

[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264495#comment-16264495
 ] 

Silvio Papa commented on KAFKA-6267:


- Windows 10 Pro 64bit
- Lubuntu 32bit 

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264493#comment-16264493
 ] 

Ismael Juma commented on KAFKA-6267:


What is the operating system that the broker is running on?

> Kafka Producer - initTransaction forever waiting
> 
>
> Key: KAFKA-6267
> URL: https://issues.apache.org/jira/browse/KAFKA-6267
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.1, 0.11.0.2
>Reporter: Silvio Papa
>Priority: Blocker
> Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6267) Kafka Producer - initTransaction forever waiting

2017-11-23 Thread Silvio Papa (JIRA)
Silvio Papa created KAFKA-6267:
--

 Summary: Kafka Producer - initTransaction forever waiting
 Key: KAFKA-6267
 URL: https://issues.apache.org/jira/browse/KAFKA-6267
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.11.0.2, 0.11.0.1
Reporter: Silvio Papa
Priority: Blocker
 Attachments: producer.JPG

In code of attached image, the producer remains forever awaiting in 
initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6238) Issues with protocol version when applying a rolling upgrade to 1.0.0

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264421#comment-16264421
 ] 

ASF GitHub Bot commented on KAFKA-6238:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4256


> Issues with protocol version when applying a rolling upgrade to 1.0.0
> -
>
> Key: KAFKA-6238
> URL: https://issues.apache.org/jira/browse/KAFKA-6238
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.0.0
>Reporter: Diego Louzán
>Assignee: Jason Gustafson
>
> Hello,
> I am trying to perform a rolling upgrade from 0.10.0.1 to 1.0.0, and 
> according to the instructions in the documentation, I should only have to 
> upgrade the "inter.broker.protocol.version" parameter in the first step. But 
> after setting the value to "0.10.0" or "0.10.0.1" (tried both), the broker 
> refuses to start with the following error:
> {code}
> [2017-11-20 08:28:46,620] FATAL  (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 1.0-IV0 cannot be used when 
> inter.broker.protocol.version is set to 0.10.0.1
> at scala.Predef$.require(Predef.scala:224)
> at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1205)
> at kafka.server.KafkaConfig.(KafkaConfig.scala:1170)
> at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
> at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
> at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> {code}
> I checked the instructions for rolling upgrades to previous versions (namely 
> 0.11.0.0), and in here it's stated that is also needed to upgrade the 
> "log.message.format.version" parameter in two stages. I have tried that and 
> the upgrade worked. It seems it still applies to version 1.0.0, so I'm not 
> sure if this is wrong documentation, or an actual issue with kafka since it 
> should work as stated in the docs.
> Regards,
> Diego Louzán



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval

2017-11-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264420#comment-16264420
 ] 

Ismael Juma commented on KAFKA-6266:


The errors don't go away after a while?

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
> I'm seeing these continuously in the log, and want these to be fixed- so that 
> they wont repeat. Can someone please help me in fixing the below warnings.
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
> WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid

2017-11-23 Thread VinayKumar (JIRA)
VinayKumar created KAFKA-6266:
-

 Summary: Kafka 1.0.0 : Repeated occurrence of WARN Resetting first 
dirty offset of __consumer_offsets-xx to log start offset 203569 since the 
checkpointed offset 120955 is invalid. (kafka.log.LogCleanerManager$)
 Key: KAFKA-6266
 URL: https://issues.apache.org/jira/browse/KAFKA-6266
 Project: Kafka
  Issue Type: Bug
  Components: offset manager
Affects Versions: 1.0.0
 Environment: CentOS 7, Apache kafka_2.12-1.0.0
Reporter: VinayKumar


I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
warnings in the log.
I'm seeing these continuously in the log, and want these to be fixed- so that 
they wont repeat. Can someone please help me in fixing the below warnings.

WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 
3346 since the checkpointed offset 3332 is invalid. 
(kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 
4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 
203569 since the checkpointed offset 120955 is invalid. 
(kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 
16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2647) Migrate System Tools to work with SSL

2017-11-23 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264300#comment-16264300
 ] 

Pablo Panero commented on KAFKA-2647:
-

[~benstopford] Hello, sorry for the noise in an old ticket. What are the 
alternatives at the moment? I am in high need of using GetOffsetShell or 
similar, but nothing is being implemented and no one accepts PRs. See: 
https://issues.apache.org/jira/browse/KAFKA-3355

> Migrate System Tools to work with SSL
> -
>
> Key: KAFKA-2647
> URL: https://issues.apache.org/jira/browse/KAFKA-2647
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Ben Stopford
>Assignee: Ben Stopford
>
> The following system tools won't work with SSL enabled brokers. They should 
> either be directly ported over or we should provide some equivalent 
> functionality. 
> - ReplicaVerificationTool (SimpleConsumer + Producer (via ClientUtils))
> - ReplayLogProducer (HL Consumer / New Producer)
> - ConsumerOffsetChecker (Broker Channel)
> - GetOffsetShell (SimpleConsumer, Old Producer used for TopicMetadata fetch)
> - SimpleConsumerShell (SimpleConsumer)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-23 Thread Tim Van Laer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264106#comment-16264106
 ] 

Tim Van Laer commented on KAFKA-6248:
-

Hi Bill, 

Thank you for the information, I must have looked over it while browsing the 
docs. The solution you propose does suit my needs for now. Thanks!

Now I just need to find out if my application running Kafka Streams 1.0.0 will 
be compatible with our 0.10.2 cluster. 

Regards,
Tim


> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)