[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265006#comment-16265006 ] Silvio Papa commented on KAFKA-6267: Thanks for reply Ismael, After change of the broker the issue seems resolved in linux. Do you have a suggestion for mauro issue? > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file
[ https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264945#comment-16264945 ] Philippe Laflamme commented on KAFKA-5413: -- We're hitting the same problem and would like to upgrade to a 0.10.x version to reduce the workload and, more importantly, risks. According to https://issues.apache.org/jira/projects/KAFKA/versions/12340570 there are 19 bugs total, 2 of which are marked critical and 14 major. As far as I can tell, there is no workaround for this problem. Is this enough reason to go through a release cycle? I'd like to provide some help, but obviously I can't do that for releasing. What are things that users like us can do to make a 0.10.2.2 release happen? > Log cleaner fails due to large offset in segment file > - > > Key: KAFKA-5413 > URL: https://issues.apache.org/jira/browse/KAFKA-5413 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0 >Reporter: Nicholas Ngorok >Assignee: Kelvin Rutt >Priority: Critical > Labels: reliability > Fix For: 0.10.2.2, 0.11.0.0 > > Attachments: .index.cleaned, > .log, .log.cleaned, > .timeindex.cleaned, 002147422683.log, > kafka-5413.patch > > > The log cleaner thread in our brokers is failing with the trace below > {noformat} > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 > 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner) > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp > Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. > (kafka.log.LogCleaner) > [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) > java.lang.IllegalArgumentException: requirement failed: largest offset in > message set can not be safely converted to relative offset. > at scala.Predef$.require(Predef.scala:224) > at kafka.log.LogSegment.append(LogSegment.scala:109) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.clean(LogCleaner.scala:362) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Stopped (kafka.log.LogCleaner) > {noformat} > This seems to point at the specific line [here| > https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92] > in the kafka src where the difference is actually larger than MAXINT as both > baseOffset and offset are of type long. It was introduced in this [pr| > https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631] > These were the outputs of dumping the first two log segments > {noformat} > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0.log > Dumping /kafka-logs/__consumer_offsets-12/.log > Starting offset: 0 > offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: > -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34 > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0002147343575.log > Dumping /kafka-logs/__consumer_offsets-12/002147343575.log > Starting offset: 2147343575 > offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo > adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34 > {noformat} > My guess is that since 2147539884 is larger than MAXINT, we are hitting this > exception. Was there a specific reason, this check was added in
[jira] [Comment Edited] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264926#comment-16264926 ] Matthias J. Sax edited comment on KAFKA-6269 at 11/24/17 5:14 AM: -- Had a look into the code, and I think I understand the issue. The guard is not working as expected (what was my suspicion). The only workaround I can think of atm, would be to read the table source topics as {{KStream}} and do a dummy aggregation that only keeps the latest value to transform the {{KStream}} into a {{KTable}} -- this will create a proper changelog topic and avoid hitting this issue. For your special case, as you write the topic with {{to()}} you could also replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the {{entitiy-X-exists}} topics anymore. This should result in a no-overhead solution for you, as there will be no repartitioning topic as you only call a {{mapValues}} before the {{groupBy().aggregation()}}. It would be great if you could confirm if this workaround works. Thx. was (Author: mjsax): Had a look into the code, and I think I understand the issue. The guard is not working as expected (what was my suspicion). The only workaround I can think of atm, would be to read the table source topics as {{KStream}} and do a dummy aggregation that only keeps the latest value to transform the {{KStream}} into a {{KTable}} -- this will create a proper changelog topic and avoid hitting this issue. For your special case, as you write the topic with {{to()}} you could also replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the {{entitiy-X-exists}} topics anymore. This should result in a no-overhead solution for you, as there will be no repartitioning topic as you only call a {{mapValues}} before the {{groupBy().aggregation()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264926#comment-16264926 ] Matthias J. Sax commented on KAFKA-6269: Had a look into the code, and I think I understand the issue. The guard is not working as expected (what was my suspicion). The only workaround I can think of atm, would be to read the table source topics as {{KStream}} and do a dummy aggregation that only keeps the latest value to transform the {{KStream}} into a {{KTable}} -- this will create a proper changelog topic and avoid hitting this issue. For your special case, as you write the topic with {{to()}} you could also replace the {{to()}} with a {{groupByKey().aggregate()}} and you don't need the {{entitiy-X-exists}} topics anymore. This should result in a no-overhead solution for you, as there will be no repartitioning topic as you only call a {{mapValues}} before the {{groupBy().aggregation()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > >
[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6269: --- Priority: Blocker (was: Major) > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-03: > > topics: [entity-B-exists] > > children: [KTABLE-SOURCE-04] > >
[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6269: --- Fix Version/s: 1.0.1 1.1.0 > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-03: > > topics: [entity-B-exists] > > children: [KTABLE-SOURCE-04] > >
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264921#comment-16264921 ] Matthias J. Sax commented on KAFKA-6269: Thanks for reporting this. If you get the exception {{Log end offset of entity-B-exists-0 should not change while restoring: old end offset 4750539, current offset 4751388}}, this means that a thread started to recover a state from a changelog topic and the endOffset moved during the process -- this should never happen (only if the task was migrated to another thread and this other thread writes into the changelog topic). If the state is not migrated, the thread that restores would be the only one that is "allowed" to write into the changelog; but as long it restores it does not write. What I could think of is a bug, that relates to an optimization we do: As you read KTables from source topics, there is no additional changelog topic as the source topic can be used to recreate the state (the error message shows that the source topic is used for store recovery). Thus, a upstream processor that write to the source topic can of course append data to this topic -- we actually have a guard for this case, but we change some code here, maybe introducing a bug that this guard does not work properly anymore: thus, the restoring thread "thinks" it reads a changelog topic (while it does read a source topic) for recovery and fails even if it shouldn't. I'll try to reproduce this locally and cycle back to you. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread
[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264806#comment-16264806 ] Matthias J. Sax commented on KAFKA-6248: It is compatible: https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility If this does what you need, would you mind close this as "no a problem"? > Enable configuration of internal topics of Kafka Streams applications > - > > Key: KAFKA-6248 > URL: https://issues.apache.org/jira/browse/KAFKA-6248 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Tim Van Laer >Priority: Minor > > In the current implementation of Kafka Streams, it is not possible to set > custom configuration to internal topics (e.g. max.message.bytes, > retention.ms...). It would be nice if a developer can set some specific > configuration. > E.g. if you want to store messages bigger than 1MiB in a state store, you > have to alter the corresponding changelog topic with a max.message.bytes > setting. > The workaround is to create the 'internal' topics upfront using the correct > naming convention so Kafka Streams will use the explicitly defined topics as > if they are internal. > An alternative is to alter the internal topics after the Kafka Streams > application is started and has created its internal topics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264728#comment-16264728 ] Michal Klempa commented on KAFKA-6266: -- I am experiencing this with Kafka 0.10.2.1 (confluent platform), CentOS6 > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so that > they wont repeat. Can someone please help me in fixing the below warnings. > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264653#comment-16264653 ] VinayKumar commented on KAFKA-6266: --- Thank you for the reply. The errors don't go away, they are continuously seen repeating in the log ever. > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so that > they wont repeat. Can someone please help me in fixing the below warnings. > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5647) Use async ZookeeperClient for Admin operations
[ https://issues.apache.org/jira/browse/KAFKA-5647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264621#comment-16264621 ] ASF GitHub Bot commented on KAFKA-5647: --- GitHub user omkreddy opened a pull request: https://github.com/apache/kafka/pull/4260 KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and PreferredReplicaLeaderElectionCommand * Use KafkaZkClient in ReassignPartitionsCommand * Use KafkaZkClient in PreferredReplicaLeaderElectionCommand * Updated test classes to use new methods * All existing tests should pass ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/omkreddy/kafka KAFKA-5647-ADMINCOMMANDS Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4260.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4260 commit b0dcf9fde0754f17e576186c47889b621b3b9769 Author: Manikumar ReddyDate: 2017-11-17T15:38:52Z KAFKA-5647: Use KafkaZkClient in ReassignPartitionsCommand and PreferredReplicaLeaderElectionCommand > Use async ZookeeperClient for Admin operations > -- > > Key: KAFKA-5647 > URL: https://issues.apache.org/jira/browse/KAFKA-5647 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Manikumar > Fix For: 1.1.0 > > > Since we will be removing the ZK dependency in most of the admin clients, we > only need to change the admin operations used on the server side. This > includes converting AdminManager and the remaining usage of zkUtils in > KafkaApis to use ZookeeperClient/KafkaZkClient. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561 ] Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:46 PM: --- Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on the db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing the messages we're publishing on the Kafka Broker. What we have now is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction is not committed but is on a IN_TRANSITION state and i don't know why 5. A second call to the rest service immedialy fails as a new kafka transaction is required but there's the old one up. So the abortTransaction method is invoked. What we need to have is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> New Product request Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction committed and available for the consumers. Here is my configurations for the factory, template, etc. : public PlatformTransactionManager transactionManager( EntityManagerFactory emf, DefaultKafkaProducerFactory producerFactory) { final JpaTransactionManager jpaTransactionManager = new JpaTransactionManager(); jpaTransactionManager.setEntityManagerFactory(emf); final KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory); kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true); kafkaTransactionManager.setNestedTransactionAllowed(true); kafkaTransactionManager.setValidateExistingTransaction(true); kafkaTransactionManager.setRollbackOnCommitFailure(true); kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS); return new ChainedTransactionManager(new PlatformTransactionManager[] { jpaTransactionManager, kafkaTransactionManager }); } @Bean public DefaultKafkaProducerFactory producerFactory() { final MapconfigMap = new HashMap<>(); configMap.put("bootstrap.servers", bootstrapServers); configMap.put("key.serializer", keySerializer); configMap.put("value.serializer", valueSerializer); configMap.put("schema.registry.url", schemaRegistryEndpoint); configMap.put("enable.idempotence", enableIdempotenceConfig ); configMap.put("transactional.id", transactionalIdConfig); configMap.put("transaction.timeout.ms", transactionTimeoutMs); final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configMap); producerFactory.setTransactionIdPrefix(transactionalIdConfig); return producerFactory; } public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory producerFactory, RecordMessageConverter messageConverter) { KafkaTemplate template = new KafkaTemplate<>(producerFactory); template.setMessageConverter(messageConverter); return template; } Versions used : spring-integration-kafka is 2.3.0 kafka-client is 0.11.0.0 kavka-avro-serializer is 3.3.0 EDIT// spring-kafka is 1.3.0 was (Author: mauro.giacometti): Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on the db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for
[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6267: --- Priority: Major (was: Blocker) > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264580#comment-16264580 ] Ismael Juma commented on KAFKA-6267: The broker version is older than 0.11.0.0, so transactions won't work. I can from the stacktraces that classes that no longer exist in 0.11.0.0 are being used. > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264580#comment-16264580 ] Ismael Juma edited comment on KAFKA-6267 at 11/23/17 4:43 PM: -- The broker version is older than 0.11.0.0, so transactions won't work. I can see from the stacktraces that classes that no longer exist in 0.11.0.0 are being used. was (Author: ijuma): The broker version is older than 0.11.0.0, so transactions won't work. I can from the stacktraces that classes that no longer exist in 0.11.0.0 are being used. > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Silvio Papa updated KAFKA-6267: --- Component/s: clients > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264578#comment-16264578 ] Silvio Papa commented on KAFKA-6267: Done, i added server.log and controller.log > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Silvio Papa updated KAFKA-6267: --- Attachment: server.log > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Silvio Papa updated KAFKA-6267: --- Attachment: controller.log > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: controller.log, producer.JPG, server.log > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561 ] Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:31 PM: --- Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on the db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing the messages we're publishing on the Kafka Broker. What we have now is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction is not committed but is on a IN_TRANSITION state and i don't know why 5. A second call to the rest service immedialy fails as a new kafka transaction is required but there's the old one up. So the abortTransaction method is invoked. What we need to have is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> New Product request Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction committed and available for the consumers. Here is my configurations for the factory, template, etc. : public PlatformTransactionManager transactionManager( EntityManagerFactory emf, DefaultKafkaProducerFactory producerFactory) { final JpaTransactionManager jpaTransactionManager = new JpaTransactionManager(); jpaTransactionManager.setEntityManagerFactory(emf); final KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory); kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true); kafkaTransactionManager.setNestedTransactionAllowed(true); kafkaTransactionManager.setValidateExistingTransaction(true); kafkaTransactionManager.setRollbackOnCommitFailure(true); kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS); return new ChainedTransactionManager(new PlatformTransactionManager[] { jpaTransactionManager, kafkaTransactionManager }); } @Bean public DefaultKafkaProducerFactory producerFactory() { final MapconfigMap = new HashMap<>(); configMap.put("bootstrap.servers", bootstrapServers); configMap.put("key.serializer", keySerializer); configMap.put("value.serializer", valueSerializer); configMap.put("schema.registry.url", schemaRegistryEndpoint); configMap.put("enable.idempotence", enableIdempotenceConfig ); configMap.put("transactional.id", transactionalIdConfig); configMap.put("transaction.timeout.ms", transactionTimeoutMs); final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configMap); producerFactory.setTransactionIdPrefix(transactionalIdConfig); return producerFactory; } public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory producerFactory, RecordMessageConverter messageConverter) { KafkaTemplate template = new KafkaTemplate<>(producerFactory); template.setMessageConverter(messageConverter); return template; } Versions used : spring-integration-kafka is 2.3.0 kafka-client is 0.11.0.1 kavka-avro-serializer is 3.3.0 EDIT// spring-kafka is 1.3.0 was (Author: mauro.giacometti): Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on the db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for
[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561 ] Mauro Giacometti edited comment on KAFKA-6267 at 11/23/17 4:28 PM: --- Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on the db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing the messages we're publishing on the Kafka Broker. What we have now is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction is not committed but is on a IN_TRANSITION state and i don't know why 5. A second call to the rest service immedialy fails as a new kafka transaction is required but there's the old one up. So the abortTransaction method is invoked. What we need to have is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> New Product request Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction committed and available for the consumers. Here is my configurations for the factory, template, etc. : public PlatformTransactionManager transactionManager( EntityManagerFactory emf, DefaultKafkaProducerFactory producerFactory) { final JpaTransactionManager jpaTransactionManager = new JpaTransactionManager(); jpaTransactionManager.setEntityManagerFactory(emf); final KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory); kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true); kafkaTransactionManager.setNestedTransactionAllowed(true); kafkaTransactionManager.setValidateExistingTransaction(true); kafkaTransactionManager.setRollbackOnCommitFailure(true); kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS); return new ChainedTransactionManager(new PlatformTransactionManager[] { jpaTransactionManager, kafkaTransactionManager }); } @Bean public DefaultKafkaProducerFactory producerFactory() { final MapconfigMap = new HashMap<>(); configMap.put("bootstrap.servers", bootstrapServers); configMap.put("key.serializer", keySerializer); configMap.put("value.serializer", valueSerializer); configMap.put("schema.registry.url", schemaRegistryEndpoint); configMap.put("enable.idempotence", enableIdempotenceConfig ); configMap.put("transactional.id", transactionalIdConfig); configMap.put("transaction.timeout.ms", transactionTimeoutMs); final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configMap); producerFactory.setTransactionIdPrefix(transactionalIdConfig); return producerFactory; } public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory producerFactory, RecordMessageConverter messageConverter) { KafkaTemplate template = new KafkaTemplate<>(producerFactory); template.setMessageConverter(messageConverter); return template; } Versions used : spring-integration-kafka is 2.3.0 kafka-client is 0.11.0.1 kavka-avro-serializer is 3.3.0 was (Author: mauro.giacometti): Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on a db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing the messages we're
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264568#comment-16264568 ] Ismael Juma commented on KAFKA-6267: Please include full broker and client logs. The error you pasted seems to indicate a versioning issue. > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264561#comment-16264561 ] Mauro Giacometti commented on KAFKA-6267: - Hi Ismal, i'm facing the same issue on Linux. Let me describe you the whole scenario. Silvio is my colleague and he's working with me in my project. I'm trying to chain a JPATransaction with a KafkaTransaction by using a ChainedTransactionManager provided by spring. This choise deals with the need to implement a One Phase Commit. We need to open a "unique transaction" (managed by the ChainedTransactionManager) which open first a transaction to a Kafka Broker, then a transaction to an Oracle DB, a write operation is performed on a db. A final commit on the db is performed immediatly followed by a commit of the Kafka Transaction. Every rollback in the JPA transaction will lead to a Kafka transaction rollback. Moreover, we're using spring-integration-kafka and Apache Avro for manipulating/serializing/deserializing the messages we're publishing on the Kafka Broker. What we have now is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction is not committed but is on a IN_TRANSITION state and i don't know why 5. A second call to the rest service immedialy fails as a new kafka transaction is required but there's the old one up. So the abortTransaction method is invoked. What we need to have is: 0. Client rest service invocation for the creation of a product. 1. Kafka transaction is opened -> New Product request Message is delivered. 2. Data are persisted on the database 3. Data are correctly committed on database 4. Kafka transaction committed and available for the consumers. Here is my configurations for the factory, template, etc. : public PlatformTransactionManager transactionManager( EntityManagerFactory emf, DefaultKafkaProducerFactory producerFactory) { final JpaTransactionManager jpaTransactionManager = new JpaTransactionManager(); jpaTransactionManager.setEntityManagerFactory(emf); final KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory); kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true); kafkaTransactionManager.setNestedTransactionAllowed(true); kafkaTransactionManager.setValidateExistingTransaction(true); kafkaTransactionManager.setRollbackOnCommitFailure(true); kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS); return new ChainedTransactionManager(new PlatformTransactionManager[] { jpaTransactionManager, kafkaTransactionManager }); } @Bean public DefaultKafkaProducerFactory producerFactory() { final MapconfigMap = new HashMap<>(); configMap.put("bootstrap.servers", bootstrapServers); configMap.put("key.serializer", keySerializer); configMap.put("value.serializer", valueSerializer); configMap.put("schema.registry.url", schemaRegistryEndpoint); configMap.put("enable.idempotence", enableIdempotenceConfig ); configMap.put("transactional.id", transactionalIdConfig); configMap.put("transaction.timeout.ms", transactionTimeoutMs); final DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(configMap); producerFactory.setTransactionIdPrefix(transactionalIdConfig); return producerFactory; } public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory producerFactory, RecordMessageConverter messageConverter) { KafkaTemplate template = new KafkaTemplate<>(producerFactory); template.setMessageConverter(messageConverter); return template; } Versions used : spring-integration-kafka is 2.3.0 kafka-client is 0.11.0.1 kavka-avro-serializer is 3.3.0 > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264550#comment-16264550 ] Silvio Papa edited comment on KAFKA-6267 at 11/23/17 4:18 PM: -- In lubuntu obtain the log: ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 18 and topis obviously isn't created. was (Author: papas): In lubuntu obtain the log: ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 18 and topis isn't created. > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264550#comment-16264550 ] Silvio Papa commented on KAFKA-6267: In lubuntu obtain the log: ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 18 and topis isn't created. > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andreas Schroeder updated KAFKA-6269: - Description: I have the following kafka streams topology: entity-B -> map step -> entity-B-exists (with state store) entity-A -> map step -> entity-A-exists (with state store) (entity-B-exists, entity-A-exists) -> outer join with state store. The topology building code looks like this (some data type, serde, valuemapper, and joiner code omitted): {code} def buildTable[V](builder: StreamsBuilder, sourceTopic: String, existsTopic: String, valueSerde: Serde[V], valueMapper: ValueMapper[String, V]): KTable[String, V] = { val stream: KStream[String, String] = builder.stream[String, String](sourceTopic) val transformed: KStream[String, V] = stream.mapValues(valueMapper) transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) val inMemoryStoreName = s"$existsTopic-persisted" val materialized = Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) .withKeySerde(Serdes.String()) .withValueSerde(valueSerde) .withLoggingDisabled() builder.table(existsTopic, materialized) } val builder = new StreamsBuilder val mapToEmptyString: ValueMapper[String, String] = (value: String) => if (value != null) "" else null val entitiesB: KTable[String, EntityBInfo] = buildTable(builder, "entity-B", "entity-B-exists", EntityBInfoSerde, ListingImagesToEntityBInfo) val entitiesA: KTable[String, String] = buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), mapToEmptyString) val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => EntityDiff.fromJoin(a, b) val materialized = Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) .withKeySerde(Serdes.String()) .withValueSerde(EntityDiffSerde) .withLoggingEnabled(new java.util.HashMap[String, String]()) val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, materialized) {code} We run 4 processor machines with 30 stream threads each; each topic has 30 partitions so that there is a total of 4 x 30 = 120 partitions to consume. The initial launch of the processor works fine, but when killing one processor and letting him re-join the stream threads leads to some faulty behaviour. Fist, the total number of assigned partitions over all processor machines is larger than 120 (sometimes 157, sometimes just 132), so the partition / task assignment seems to assign the same job to different stream threads. The processor machines trying to re-join the consumer group fail constantly with the error message of 'Detected a task that got migrated to another thread.' We gave the processor half an hour to recover; usually, rebuilding the KTable states take around 20 seconds (with Kafka 0.11.0.1). Here are the details of the errors we see: stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now. {code} org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-B-exists-0 should not change while restoring: old end offset 4750539, current offset 4751388 > StreamsTask taskId: 1_0 > > ProcessorTopology: > KSTREAM-SOURCE-08: > topics: [entity-A-exists] > children: [KTABLE-SOURCE-09] > KTABLE-SOURCE-09: > states: [entity-A-exists-persisted] > children: [KTABLE-JOINTHIS-11] > KTABLE-JOINTHIS-11: > states: [entity-B-exists-persisted] > children: [KTABLE-MERGE-10] > KTABLE-MERGE-10: > states: [entity-A-joined-with-entity-B] > KSTREAM-SOURCE-03: > topics: [entity-B-exists] > children: [KTABLE-SOURCE-04] > KTABLE-SOURCE-04: > states: [entity-B-exists-persisted] > children: [KTABLE-JOINOTHER-12] > KTABLE-JOINOTHER-12: > states: [entity-A-exists-persisted] > children: [KTABLE-MERGE-10] > KTABLE-MERGE-10: > states: [entity-A-joined-with-entity-B] > Partitions [entity-A-exists-0, entity-B-exists-0] at
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264500#comment-16264500 ] Ismael Juma commented on KAFKA-6267: There are known issues affecting Windows. Are you saying that you've seen the same issue when running on Linux? > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6269) KTable state restore fails after rebalance
Andreas Schroeder created KAFKA-6269: Summary: KTable state restore fails after rebalance Key: KAFKA-6269 URL: https://issues.apache.org/jira/browse/KAFKA-6269 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Andreas Schroeder I have the following kafka streams topology: entity-B -> map step -> entity-B-exists (with state store) entity-A -> map step -> entity-A-exists (with state store) (entity-B-exists, entity-A-exists) -> outer join with state store. The topology building code looks like this (some data type, serde, valuemapper, and joiner code omitted): def buildTable[V](builder: StreamsBuilder, sourceTopic: String, existsTopic: String, valueSerde: Serde[V], valueMapper: ValueMapper[String, V]): KTable[String, V] = { val stream: KStream[String, String] = builder.stream[String, String](sourceTopic) val transformed: KStream[String, V] = stream.mapValues(valueMapper) transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) val inMemoryStoreName = s"$existsTopic-persisted" val materialized = Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) .withKeySerde(Serdes.String()) .withValueSerde(valueSerde) .withLoggingDisabled() builder.table(existsTopic, materialized) } val builder = new StreamsBuilder val mapToEmptyString: ValueMapper[String, String] = (value: String) => if (value != null) "" else null val entitiesB: KTable[String, EntityBInfo] = buildTable(builder, "entity-B", "entity-B-exists", EntityBInfoSerde, ListingImagesToEntityBInfo) val entitiesA: KTable[String, String] = buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), mapToEmptyString) val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => EntityDiff.fromJoin(a, b) val materialized = Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) .withKeySerde(Serdes.String()) .withValueSerde(EntityDiffSerde) .withLoggingEnabled(new java.util.HashMap[String, String]()) val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, joiner, materialized) We run 4 processor machines with 30 stream threads each; each topic has 30 partitions so that there is a total of 4 x 30 = 120 partitions to consume. The initial launch of the processor works fine, but when killing one processor and letting him re-join the stream threads leads to some faulty behaviour. Fist, the total number of assigned partitions over all processor machines is larger than 120 (sometimes 157, sometimes just 132), so the partition / task assignment seems to assign the same job to different stream threads. The processor machines trying to re-join the consumer group fail constantly with the error message of 'Detected a task that got migrated to another thread.' We gave the processor half an hour to recover; usually, rebuilding the KTable states take around 20 seconds (with Kafka 0.11.0.1). Here are the details of the errors we see: stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now. org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of entity-B-exists-0 should not change while restoring: old end offset 4750539, current offset 4751388 > StreamsTask taskId: 1_0 > > ProcessorTopology: > KSTREAM-SOURCE-08: > topics: [entity-A-exists] > children: [KTABLE-SOURCE-09] > KTABLE-SOURCE-09: > states: [entity-A-exists-persisted] > children: [KTABLE-JOINTHIS-11] > KTABLE-JOINTHIS-11: > states: [entity-B-exists-persisted] > children: [KTABLE-MERGE-10] > KTABLE-MERGE-10: > states: [entity-A-joined-with-entity-B] > KSTREAM-SOURCE-03: > topics: [entity-B-exists] > children: [KTABLE-SOURCE-04] > KTABLE-SOURCE-04: > states: [entity-B-exists-persisted] > children: [KTABLE-JOINOTHER-12] > KTABLE-JOINOTHER-12: > states: [entity-A-exists-persisted] > children: [KTABLE-MERGE-10] > KTABLE-MERGE-10: > states:
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264495#comment-16264495 ] Silvio Papa commented on KAFKA-6267: - Windows 10 Pro 64bit - Lubuntu 32bit > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
[ https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264493#comment-16264493 ] Ismael Juma commented on KAFKA-6267: What is the operating system that the broker is running on? > Kafka Producer - initTransaction forever waiting > > > Key: KAFKA-6267 > URL: https://issues.apache.org/jira/browse/KAFKA-6267 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.1, 0.11.0.2 >Reporter: Silvio Papa >Priority: Blocker > Attachments: producer.JPG > > > In code of attached image, the producer remains forever awaiting in > initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6267) Kafka Producer - initTransaction forever waiting
Silvio Papa created KAFKA-6267: -- Summary: Kafka Producer - initTransaction forever waiting Key: KAFKA-6267 URL: https://issues.apache.org/jira/browse/KAFKA-6267 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.11.0.2, 0.11.0.1 Reporter: Silvio Papa Priority: Blocker Attachments: producer.JPG In code of attached image, the producer remains forever awaiting in initTransaction with default configuration of broker -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6238) Issues with protocol version when applying a rolling upgrade to 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264421#comment-16264421 ] ASF GitHub Bot commented on KAFKA-6238: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4256 > Issues with protocol version when applying a rolling upgrade to 1.0.0 > - > > Key: KAFKA-6238 > URL: https://issues.apache.org/jira/browse/KAFKA-6238 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 1.0.0 >Reporter: Diego Louzán >Assignee: Jason Gustafson > > Hello, > I am trying to perform a rolling upgrade from 0.10.0.1 to 1.0.0, and > according to the instructions in the documentation, I should only have to > upgrade the "inter.broker.protocol.version" parameter in the first step. But > after setting the value to "0.10.0" or "0.10.0.1" (tried both), the broker > refuses to start with the following error: > {code} > [2017-11-20 08:28:46,620] FATAL (kafka.Kafka$) > java.lang.IllegalArgumentException: requirement failed: > log.message.format.version 1.0-IV0 cannot be used when > inter.broker.protocol.version is set to 0.10.0.1 > at scala.Predef$.require(Predef.scala:224) > at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1205) > at kafka.server.KafkaConfig.(KafkaConfig.scala:1170) > at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881) > at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878) > at > kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {code} > I checked the instructions for rolling upgrades to previous versions (namely > 0.11.0.0), and in here it's stated that is also needed to upgrade the > "log.message.format.version" parameter in two stages. I have tried that and > the upgrade worked. It seems it still applies to version 1.0.0, so I'm not > sure if this is wrong documentation, or an actual issue with kafka since it > should work as stated in the docs. > Regards, > Diego Louzán -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264420#comment-16264420 ] Ismael Juma commented on KAFKA-6266: The errors don't go away after a while? > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so that > they wont repeat. Can someone please help me in fixing the below warnings. > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid
VinayKumar created KAFKA-6266: - Summary: Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid. (kafka.log.LogCleanerManager$) Key: KAFKA-6266 URL: https://issues.apache.org/jira/browse/KAFKA-6266 Project: Kafka Issue Type: Bug Components: offset manager Affects Versions: 1.0.0 Environment: CentOS 7, Apache kafka_2.12-1.0.0 Reporter: VinayKumar I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below warnings in the log. I'm seeing these continuously in the log, and want these to be fixed- so that they wont repeat. Can someone please help me in fixing the below warnings. WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 3346 since the checkpointed offset 3332 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 203569 since the checkpointed offset 120955 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-2647) Migrate System Tools to work with SSL
[ https://issues.apache.org/jira/browse/KAFKA-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264300#comment-16264300 ] Pablo Panero commented on KAFKA-2647: - [~benstopford] Hello, sorry for the noise in an old ticket. What are the alternatives at the moment? I am in high need of using GetOffsetShell or similar, but nothing is being implemented and no one accepts PRs. See: https://issues.apache.org/jira/browse/KAFKA-3355 > Migrate System Tools to work with SSL > - > > Key: KAFKA-2647 > URL: https://issues.apache.org/jira/browse/KAFKA-2647 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Ben Stopford >Assignee: Ben Stopford > > The following system tools won't work with SSL enabled brokers. They should > either be directly ported over or we should provide some equivalent > functionality. > - ReplicaVerificationTool (SimpleConsumer + Producer (via ClientUtils)) > - ReplayLogProducer (HL Consumer / New Producer) > - ConsumerOffsetChecker (Broker Channel) > - GetOffsetShell (SimpleConsumer, Old Producer used for TopicMetadata fetch) > - SimpleConsumerShell (SimpleConsumer) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264106#comment-16264106 ] Tim Van Laer commented on KAFKA-6248: - Hi Bill, Thank you for the information, I must have looked over it while browsing the docs. The solution you propose does suit my needs for now. Thanks! Now I just need to find out if my application running Kafka Streams 1.0.0 will be compatible with our 0.10.2 cluster. Regards, Tim > Enable configuration of internal topics of Kafka Streams applications > - > > Key: KAFKA-6248 > URL: https://issues.apache.org/jira/browse/KAFKA-6248 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Tim Van Laer >Priority: Minor > > In the current implementation of Kafka Streams, it is not possible to set > custom configuration to internal topics (e.g. max.message.bytes, > retention.ms...). It would be nice if a developer can set some specific > configuration. > E.g. if you want to store messages bigger than 1MiB in a state store, you > have to alter the corresponding changelog topic with a max.message.bytes > setting. > The workaround is to create the 'internal' topics upfront using the correct > naming convention so Kafka Streams will use the explicitly defined topics as > if they are internal. > An alternative is to alter the internal topics after the Kafka Streams > application is started and has created its internal topics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)