[jira] [Created] (KAFKA-8238) Log how many bytes and messages were read from __consumer_offsets
Colin P. McCabe created KAFKA-8238: -- Summary: Log how many bytes and messages were read from __consumer_offsets Key: KAFKA-8238 URL: https://issues.apache.org/jira/browse/KAFKA-8238 Project: Kafka Issue Type: Improvement Reporter: Colin P. McCabe We should log how many bytes and messages were read from __consumer_offsets. Currently we only log how long it took. Example: {code} [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-22 in 23131 milliseconds. {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8238) Log how many bytes and messages were read from __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe updated KAFKA-8238: --- Labels: newbie (was: ) > Log how many bytes and messages were read from __consumer_offsets > - > > Key: KAFKA-8238 > URL: https://issues.apache.org/jira/browse/KAFKA-8238 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Priority: Minor > Labels: newbie > > We should log how many bytes and messages were read from __consumer_offsets. > Currently we only log how long it took. Example: > {code} > [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata > from __consumer_offsets-22 in 23131 milliseconds. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application
[ https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boquan Tang resolved KAFKA-8228. Resolution: Duplicate This might duplicate KAFKA-7866, close for now and watch that ticket. > Exactly once semantics break during server restart for kafka-streams > application > > > Key: KAFKA-8228 > URL: https://issues.apache.org/jira/browse/KAFKA-8228 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > > We are using 2.2.0 for kafka-streams client and 2.0.1 for server. > We have a simple kafka-streams application that has the following topology: > {code:java} > Source: KSTREAM-SOURCE-04 (topics: [deduped-adclick]) > --> KSTREAM-TRANSFORM-05 > Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store]) > --> KSTREAM-TRANSFORM-06 > <-- KSTREAM-SOURCE-04 > Source: KSTREAM-SOURCE-00 (topics: [advertiser-budget]) > --> KTABLE-SOURCE-01 > Source: KSTREAM-SOURCE-02 (topics: [advertisement-budget]) > --> KTABLE-SOURCE-03 > Processor: KSTREAM-TRANSFORM-06 (stores: [advertiser-budget-store, > advertisement-budget-store]) > --> KSTREAM-SINK-07 > <-- KSTREAM-TRANSFORM-05 > Sink: KSTREAM-SINK-07 (topic: budget-adclick) > <-- KSTREAM-TRANSFORM-06 > Processor: KTABLE-SOURCE-01 (stores: [advertiser-budget-store]) > --> none > <-- KSTREAM-SOURCE-00 > Processor: KTABLE-SOURCE-03 (stores: [advertisement-budget-store]) > --> none > <-- KSTREAM-SOURCE-02{code} > The *Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store])* is > added additionally to investigate this EOS broken issue, and its transform() > is like this (specific K V class name is removed): > {code:java} > public void init(final ProcessorContext context) { > uidStore = (WindowStore) > context.getStateStore(uidStoreName); > this.context = context; > } > public KeyValue transform(final K key, final V value) { > final long offset = context.offset(); > final String uid = value.getUid(); > final long beginningOfHour = > Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond() > * 1000; > final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour); > final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset; > uidStore.put(uid, offset, beginningOfHour); > if (dupe) { > LOGGER.warn("Find duplication in partition {}, uid is {}, current > offset is {}, last offset is {}", > context.partition(), > uid, > offset, > maybeLastOffset); > statsEmitter.count("duplication"); > } > return dupe ? null : new KeyValue<>(key, value); > } > {code} > Although not 100% reproduce-able, we found that after we restart one or more > server on the cluster side, the duplication would be found: > {code:java} > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer, > transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:27:39Z WARN > [org.apache.kafka.streams.processor.internals.StreamThread] > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > stream-thread >
[jira] [Commented] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application
[ https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818492#comment-16818492 ] Boquan Tang commented on KAFKA-8228: [~mjsax] Gotcha. I'll watch that issue, if I found new evidence to point this issue to another root cause I'll reopen and add more details. > Exactly once semantics break during server restart for kafka-streams > application > > > Key: KAFKA-8228 > URL: https://issues.apache.org/jira/browse/KAFKA-8228 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > > We are using 2.2.0 for kafka-streams client and 2.0.1 for server. > We have a simple kafka-streams application that has the following topology: > {code:java} > Source: KSTREAM-SOURCE-04 (topics: [deduped-adclick]) > --> KSTREAM-TRANSFORM-05 > Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store]) > --> KSTREAM-TRANSFORM-06 > <-- KSTREAM-SOURCE-04 > Source: KSTREAM-SOURCE-00 (topics: [advertiser-budget]) > --> KTABLE-SOURCE-01 > Source: KSTREAM-SOURCE-02 (topics: [advertisement-budget]) > --> KTABLE-SOURCE-03 > Processor: KSTREAM-TRANSFORM-06 (stores: [advertiser-budget-store, > advertisement-budget-store]) > --> KSTREAM-SINK-07 > <-- KSTREAM-TRANSFORM-05 > Sink: KSTREAM-SINK-07 (topic: budget-adclick) > <-- KSTREAM-TRANSFORM-06 > Processor: KTABLE-SOURCE-01 (stores: [advertiser-budget-store]) > --> none > <-- KSTREAM-SOURCE-00 > Processor: KTABLE-SOURCE-03 (stores: [advertisement-budget-store]) > --> none > <-- KSTREAM-SOURCE-02{code} > The *Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store])* is > added additionally to investigate this EOS broken issue, and its transform() > is like this (specific K V class name is removed): > {code:java} > public void init(final ProcessorContext context) { > uidStore = (WindowStore) > context.getStateStore(uidStoreName); > this.context = context; > } > public KeyValue transform(final K key, final V value) { > final long offset = context.offset(); > final String uid = value.getUid(); > final long beginningOfHour = > Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond() > * 1000; > final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour); > final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset; > uidStore.put(uid, offset, beginningOfHour); > if (dupe) { > LOGGER.warn("Find duplication in partition {}, uid is {}, current > offset is {}, last offset is {}", > context.partition(), > uid, > offset, > maybeLastOffset); > statsEmitter.count("duplication"); > } > return dupe ? null : new KeyValue<>(key, value); > } > {code} > Although not 100% reproduce-able, we found that after we restart one or more > server on the cluster side, the duplication would be found: > {code:java} > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer, > transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:27:39Z WARN > [org.apache.kafka.streams.processor.internals.StreamThread] > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > stream-thread >
[jira] [Commented] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset
[ https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818487#comment-16818487 ] Boyang Chen commented on KAFKA-8236: [~mjsax] Right, this is just a preliminary thought. One way we could do this is by exposing a configuration like StreamsConfig.CODE_VERSION, such that user could inject it. However, we need to handle the A -> B -> A problem, where user could choose to rollback and rollforward multiple times. Adding a timestamp suffix might be a good fix to distinguish two different builds. > Incorporate version control for Kafka Streams Application Reset > --- > > Key: KAFKA-8236 > URL: https://issues.apache.org/jira/browse/KAFKA-8236 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Priority: Minor > Labels: needs-kip > > Inspired by Spark mlflow which supports versioning log, we should be > considering expose a special versioning tag for KStream applications to easy > rollback bad code deploy. The naive approach is to store the versioning info > in consumer offset topic so that when we perform rollback, we know where to > read from the input, and where to cleanup the changelog topic. Essentially, > this is an extension to our current application reset tool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7471) Multiple Consumer Group Management (Describe, Reset, Delete)
[ https://issues.apache.org/jira/browse/KAFKA-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818473#comment-16818473 ] ASF GitHub Bot commented on KAFKA-7471: --- vahidhashemian commented on pull request #5726: KAFKA-7471: Multiple Consumer Group Management Feature URL: https://github.com/apache/kafka/pull/5726 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Multiple Consumer Group Management (Describe, Reset, Delete) > > > Key: KAFKA-7471 > URL: https://issues.apache.org/jira/browse/KAFKA-7471 > Project: Kafka > Issue Type: New Feature > Components: tools >Affects Versions: 1.0.0, 2.0.0 >Reporter: Alex Dunayevsky >Assignee: Alex Dunayevsky >Priority: Major > > Functionality needed: > * Describe/Delete/Reset offsets on multiple consumer groups at a time > (including each group by repeating `--group` parameter) > * Describe/Delete/Reset offsets on ALL consumer groups at a time (add new > --groups-all option similar to --topics-all) > * Generate CSV for multiple consumer groups > What are the benifits? > * No need to start a new JVM to perform each query on every single consumer > group > * Abiltity to query groups by their status (for instance, `-v grepping` by > `Stable` to spot problematic/dead/empty groups) > * Ability to export offsets to reset for multiple consumer groups to a CSV > file (needs CSV generation export/import format rework) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application
[ https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818474#comment-16818474 ] Matthias J. Sax commented on KAFKA-8228: [~boquan] Can you maybe follow up on KAFKA-7866 ? Maybe we close this ticket as "contained in". > Exactly once semantics break during server restart for kafka-streams > application > > > Key: KAFKA-8228 > URL: https://issues.apache.org/jira/browse/KAFKA-8228 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > > We are using 2.2.0 for kafka-streams client and 2.0.1 for server. > We have a simple kafka-streams application that has the following topology: > {code:java} > Source: KSTREAM-SOURCE-04 (topics: [deduped-adclick]) > --> KSTREAM-TRANSFORM-05 > Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store]) > --> KSTREAM-TRANSFORM-06 > <-- KSTREAM-SOURCE-04 > Source: KSTREAM-SOURCE-00 (topics: [advertiser-budget]) > --> KTABLE-SOURCE-01 > Source: KSTREAM-SOURCE-02 (topics: [advertisement-budget]) > --> KTABLE-SOURCE-03 > Processor: KSTREAM-TRANSFORM-06 (stores: [advertiser-budget-store, > advertisement-budget-store]) > --> KSTREAM-SINK-07 > <-- KSTREAM-TRANSFORM-05 > Sink: KSTREAM-SINK-07 (topic: budget-adclick) > <-- KSTREAM-TRANSFORM-06 > Processor: KTABLE-SOURCE-01 (stores: [advertiser-budget-store]) > --> none > <-- KSTREAM-SOURCE-00 > Processor: KTABLE-SOURCE-03 (stores: [advertisement-budget-store]) > --> none > <-- KSTREAM-SOURCE-02{code} > The *Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store])* is > added additionally to investigate this EOS broken issue, and its transform() > is like this (specific K V class name is removed): > {code:java} > public void init(final ProcessorContext context) { > uidStore = (WindowStore) > context.getStateStore(uidStoreName); > this.context = context; > } > public KeyValue transform(final K key, final V value) { > final long offset = context.offset(); > final String uid = value.getUid(); > final long beginningOfHour = > Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond() > * 1000; > final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour); > final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset; > uidStore.put(uid, offset, beginningOfHour); > if (dupe) { > LOGGER.warn("Find duplication in partition {}, uid is {}, current > offset is {}, last offset is {}", > context.partition(), > uid, > offset, > maybeLastOffset); > statsEmitter.count("duplication"); > } > return dupe ? null : new KeyValue<>(key, value); > } > {code} > Although not 100% reproduce-able, we found that after we restart one or more > server on the cluster side, the duplication would be found: > {code:java} > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer, > transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] > [kafka-producer-network-thread | > adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer] > [Producer > clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer, > transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 > (*:9092) could not be established. Broker may not be available. > 2019-04-12T07:27:39Z WARN > [org.apache.kafka.streams.processor.internals.StreamThread] > [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1] > stream-thread >
[jira] [Commented] (KAFKA-8207) StickyPartitionAssignor for KStream
[ https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818471#comment-16818471 ] Matthias J. Sax commented on KAFKA-8207: I understand. However, this must be fixed differently. Not by allowing users to specify a custom partition assignor. Hence, I think we should close this ticket as "not a problem". > StickyPartitionAssignor for KStream > --- > > Key: KAFKA-8207 > URL: https://issues.apache.org/jira/browse/KAFKA-8207 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: neeraj >Priority: Major > > In KStreams I am not able to give a sticky partition assignor or my custom > partition assignor. > Overriding the property while building stream does not work > streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > CustomAssignor.class.getName()); > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7778) Add KTable.suppress to Scala API
[ https://issues.apache.org/jira/browse/KAFKA-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818464#comment-16818464 ] ASF GitHub Bot commented on KAFKA-7778: --- guozhangwang commented on pull request #6314: KAFKA-7778: Add KTable.suppress to Scala API URL: https://github.com/apache/kafka/pull/6314 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add KTable.suppress to Scala API > > > Key: KAFKA-7778 > URL: https://issues.apache.org/jira/browse/KAFKA-7778 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Assignee: John Roesler >Priority: Major > Labels: newbie > > {{KTable.suppress}} is not available in Scala API. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"
[ https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8198: -- Assignee: Victoria Bialas > KStreams testing docs use non-existent method "pipe" > > > Key: KAFKA-8198 > URL: https://issues.apache.org/jira/browse/KAFKA-8198 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1 >Reporter: Michael Drogalis >Assignee: Victoria Bialas >Priority: Minor > Labels: documentation, newbie > > In [the testing docs for > KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html], > we use the following code snippet: > {code:java} > ConsumerRecordFactory factory = new > ConsumerRecordFactory<>("input-topic", new StringSerializer(), new > IntegerSerializer()); > testDriver.pipe(factory.create("key", 42L)); > {code} > We should correct the docs to use the pipeInput method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"
[ https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8198: --- Description: In [the testing docs for KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html], we use the following code snippet: {code:java} ConsumerRecordFactory factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer()); testDriver.pipe(factory.create("key", 42L)); {code} We should correct the docs to use the pipeInput method. was: In [the testing docs for KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html], we use the following code snippet: {code:java} ConsumerRecordFactory factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer()); testDriver.pipe(factory.create("key", 42L)); {code} As of Apache Kafka 2.2.0, this method no longer exists. We should correct the docs to use the pipeInput method. > KStreams testing docs use non-existent method "pipe" > > > Key: KAFKA-8198 > URL: https://issues.apache.org/jira/browse/KAFKA-8198 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1 >Reporter: Michael Drogalis >Priority: Minor > Labels: documentation, newbie > > In [the testing docs for > KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html], > we use the following code snippet: > {code:java} > ConsumerRecordFactory factory = new > ConsumerRecordFactory<>("input-topic", new StringSerializer(), new > IntegerSerializer()); > testDriver.pipe(factory.create("key", 42L)); > {code} > We should correct the docs to use the pipeInput method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6635) Producer close does not await pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-6635: --- Fix Version/s: 2.3.0 > Producer close does not await pending transaction > - > > Key: KAFKA-6635 > URL: https://issues.apache.org/jira/browse/KAFKA-6635 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > Fix For: 2.3.0 > > > Currently close() only awaits completion of pending produce requests. If > there is a transaction ongoing, it may be dropped. For example, if one thread > is calling {{commitTransaction()}} and another calls {{close()}}, then the > commit may never happen even if the caller is willing to wait for it (by > using a long timeout). What's more, the thread blocking in > {{commitTransaction()}} will be stuck since the result will not be completed > once the producer has shutdown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6635) Producer close does not await pending transaction
[ https://issues.apache.org/jira/browse/KAFKA-6635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818449#comment-16818449 ] ASF GitHub Bot commented on KAFKA-6635: --- hachikuji commented on pull request #5971: KAFKA-6635: Producer close awaits for pending transaction URL: https://github.com/apache/kafka/pull/5971 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Producer close does not await pending transaction > - > > Key: KAFKA-6635 > URL: https://issues.apache.org/jira/browse/KAFKA-6635 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Currently close() only awaits completion of pending produce requests. If > there is a transaction ongoing, it may be dropped. For example, if one thread > is calling {{commitTransaction()}} and another calls {{close()}}, then the > commit may never happen even if the caller is willing to wait for it (by > using a long timeout). What's more, the thread blocking in > {{commitTransaction()}} will be stuck since the result will not be completed > once the producer has shutdown. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset
[ https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818443#comment-16818443 ] Matthias J. Sax commented on KAFKA-8236: Not 100% sure if this needs a KIP. > Incorporate version control for Kafka Streams Application Reset > --- > > Key: KAFKA-8236 > URL: https://issues.apache.org/jira/browse/KAFKA-8236 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Priority: Minor > Labels: needs-kip > > Inspired by Spark mlflow which supports versioning log, we should be > considering expose a special versioning tag for KStream applications to easy > rollback bad code deploy. The naive approach is to store the versioning info > in consumer offset topic so that when we perform rollback, we know where to > read from the input, and where to cleanup the changelog topic. Essentially, > this is an extension to our current application reset tool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset
[ https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8236: --- Component/s: tools streams > Incorporate version control for Kafka Streams Application Reset > --- > > Key: KAFKA-8236 > URL: https://issues.apache.org/jira/browse/KAFKA-8236 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Priority: Minor > > Inspired by Spark mlflow which supports versioning log, we should be > considering expose a special versioning tag for KStream applications to easy > rollback bad code deploy. The naive approach is to store the versioning info > in consumer offset topic so that when we perform rollback, we know where to > read from the input, and where to cleanup the changelog topic. Essentially, > this is an extension to our current application reset tool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset
[ https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8236: --- Labels: needs-kip (was: ) > Incorporate version control for Kafka Streams Application Reset > --- > > Key: KAFKA-8236 > URL: https://issues.apache.org/jira/browse/KAFKA-8236 > Project: Kafka > Issue Type: Improvement > Components: streams, tools >Reporter: Boyang Chen >Priority: Minor > Labels: needs-kip > > Inspired by Spark mlflow which supports versioning log, we should be > considering expose a special versioning tag for KStream applications to easy > rollback bad code deploy. The naive approach is to store the versioning info > in consumer offset topic so that when we perform rollback, we know where to > read from the input, and where to cleanup the changelog topic. Essentially, > this is an extension to our current application reset tool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8235) NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application
[ https://issues.apache.org/jira/browse/KAFKA-8235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818442#comment-16818442 ] Matthias J. Sax edited comment on KAFKA-8235 at 4/15/19 10:30 PM: -- Thanks for reporting this. I have issue to map the stack trace to the code. at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288) The method private void restoreBatch(final Collection> batch) seems to be in a different line... and I can also not find where TreeMap.firstKey is called. Kafka 2.3 is not released yet, thus "affected version" field seems not be correctly specified. What version are you using? {quote}The issue doesn't seem to occur for small amounts of data, but it doesn't take a particularly large amount of data to trigger the problem either. {quote} Can you specify some numbers? was (Author: mjsax): Thanks for reporting this. I have issue to map the stack trace to the code. at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288) The method private void restoreBatch(final Collection> batch) seems to be in a different line... and I can also not find where TreeMap.firstKey is called. Kafka 2.3 is not released yet, thus "affected version" field seems not be correctly specified. What version are you using? > NoSuchElementException when restoring state after a clean shutdown of a Kafka > Streams application > - > > Key: KAFKA-8235 > URL: https://issues.apache.org/jira/browse/KAFKA-8235 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 > Environment: Linux, CentOS 7, Java 1.8.0_191, 50 partitions per > topic, replication factor 3 >Reporter: Andrew Klopper >Priority: Major > > While performing a larger scale test of a new Kafka Streams application that > performs aggregation and suppression, we have discovered that we are unable > to restart the application after a clean shutdown. The error that is logged > is: > {code:java} > [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] Encountered the > following error during processing: > java.util.NoSuchElementException > at java.util.TreeMap.key(TreeMap.java:1327) > at java.util.TreeMap.firstKey(TreeMap.java:290) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > {code} > The issue doesn't seem to occur for small amounts of data, but it doesn't > take a particularly large amount of data to trigger the problem either. > Any assistance would be greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8235) NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application
[ https://issues.apache.org/jira/browse/KAFKA-8235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818442#comment-16818442 ] Matthias J. Sax commented on KAFKA-8235: Thanks for reporting this. I have issue to map the stack trace to the code. at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288) The method private void restoreBatch(final Collection> batch) seems to be in a different line... and I can also not find where TreeMap.firstKey is called. Kafka 2.3 is not released yet, thus "affected version" field seems not be correctly specified. What version are you using? > NoSuchElementException when restoring state after a clean shutdown of a Kafka > Streams application > - > > Key: KAFKA-8235 > URL: https://issues.apache.org/jira/browse/KAFKA-8235 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 > Environment: Linux, CentOS 7, Java 1.8.0_191, 50 partitions per > topic, replication factor 3 >Reporter: Andrew Klopper >Priority: Major > > While performing a larger scale test of a new Kafka Streams application that > performs aggregation and suppression, we have discovered that we are unable > to restart the application after a clean shutdown. The error that is logged > is: > {code:java} > [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] Encountered the > following error during processing: > java.util.NoSuchElementException > at java.util.TreeMap.key(TreeMap.java:1327) > at java.util.TreeMap.firstKey(TreeMap.java:290) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) > {code} > The issue doesn't seem to occur for small amounts of data, but it doesn't > take a particularly large amount of data to trigger the problem either. > Any assistance would be greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8233: --- Labels: needs-kip (was: ) > Helper class to make it simpler to write test logic with TopologyTestDriver > --- > > Key: KAFKA-8233 > URL: https://issues.apache.org/jira/browse/KAFKA-8233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jukka Karvanen >Priority: Minor > Labels: needs-kip > > When using TopologyTestDriver you need to call ConsumerRecordFactory to > create ConsumerRecord passed into pipeInput method to write to topic. Also > when calling readOutput to consume from topic, you need to provide correct > Deserializers each time. > You easily end up writing helper methods in your test classed, but this can > be avoided when adding generic input and output topic classes. > This improvement adds TestInputTopic class which wraps TopologyTestDriver > and ConsumerRecordFactory methods as one class to be used to write to Input > Topics and TestOutputTopic class which collects TopologyTestDriver reading > methods and provide typesafe read methods. > > More info and an example of how Stream test looks after using this classes: > [https://github.com/jukkakarvanen/kafka-streams-test-topics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818397#comment-16818397 ] Matthias J. Sax commented on KAFKA-8233: Thanks for creating the ticket. If you want to contribute something like this, it would be required to write a KIP: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] > Helper class to make it simpler to write test logic with TopologyTestDriver > --- > > Key: KAFKA-8233 > URL: https://issues.apache.org/jira/browse/KAFKA-8233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jukka Karvanen >Priority: Minor > > When using TopologyTestDriver you need to call ConsumerRecordFactory to > create ConsumerRecord passed into pipeInput method to write to topic. Also > when calling readOutput to consume from topic, you need to provide correct > Deserializers each time. > You easily end up writing helper methods in your test classed, but this can > be avoided when adding generic input and output topic classes. > This improvement adds TestInputTopic class which wraps TopologyTestDriver > and ConsumerRecordFactory methods as one class to be used to write to Input > Topics and TestOutputTopic class which collects TopologyTestDriver reading > methods and provide typesafe read methods. > > More info and an example of how Stream test looks after using this classes: > [https://github.com/jukkakarvanen/kafka-streams-test-topics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818388#comment-16818388 ] Matthias J. Sax commented on KAFKA-7965: one more: [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/435/tests] > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: huxihx >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8237) Untangle TopicDeletionManager and add test cases
Jason Gustafson created KAFKA-8237: -- Summary: Untangle TopicDeletionManager and add test cases Key: KAFKA-8237 URL: https://issues.apache.org/jira/browse/KAFKA-8237 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson There are a few circular dependencies involving `TopicDeletionManager`. For example, both `PartitionStateMachine` and `ReplicaStateMachine` depend on `TopicDeletionManager` while it also depends on them. This makes testing difficult and so there are no unit tests. We should fix this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818336#comment-16818336 ] Bruno Cadonna commented on KAFKA-7965: -- Two more failure with Java 8 and Java 11 with the same error messages posted above. > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: huxihx >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edmondo Porcu updated KAFKA-6248: - Comment: was deleted (was: Does it work for max.request.size? -Dtopic.max.request.size=300 -Dproducer.max.request.size=300 Exception in thread "analysis-input-enricher-e1c4502d-62e9-43a6-bf59-b37dcfc77ce2-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending since an error caught with a previous record (key NON PREVISTO value [B@62851189 timestamp 1555104485966) to topic analysis-input-enricher-cr-STATE-STORE-01-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 1403267 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.) > Enable configuration of internal topics of Kafka Streams applications > - > > Key: KAFKA-6248 > URL: https://issues.apache.org/jira/browse/KAFKA-6248 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Tim Van Laer >Priority: Minor > > In the current implementation of Kafka Streams, it is not possible to set > custom configuration to internal topics (e.g. max.message.bytes, > retention.ms...). It would be nice if a developer can set some specific > configuration. > E.g. if you want to store messages bigger than 1MiB in a state store, you > have to alter the corresponding changelog topic with a max.message.bytes > setting. > The workaround is to create the 'internal' topics upfront using the correct > naming convention so Kafka Streams will use the explicitly defined topics as > if they are internal. > An alternative is to alter the internal topics after the Kafka Streams > application is started and has created its internal topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818262#comment-16818262 ] Edmondo Porcu commented on KAFKA-6248: -- Does it work for max.request.size? -Dtopic.max.request.size=300 -Dproducer.max.request.size=300 Exception in thread "analysis-input-enricher-e1c4502d-62e9-43a6-bf59-b37dcfc77ce2-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending since an error caught with a previous record (key NON PREVISTO value [B@62851189 timestamp 1555104485966) to topic analysis-input-enricher-cr-STATE-STORE-01-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 1403267 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. > Enable configuration of internal topics of Kafka Streams applications > - > > Key: KAFKA-6248 > URL: https://issues.apache.org/jira/browse/KAFKA-6248 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Tim Van Laer >Priority: Minor > > In the current implementation of Kafka Streams, it is not possible to set > custom configuration to internal topics (e.g. max.message.bytes, > retention.ms...). It would be nice if a developer can set some specific > configuration. > E.g. if you want to store messages bigger than 1MiB in a state store, you > have to alter the corresponding changelog topic with a max.message.bytes > setting. > The workaround is to create the 'internal' topics upfront using the correct > naming convention so Kafka Streams will use the explicitly defined topics as > if they are internal. > An alternative is to alter the internal topics after the Kafka Streams > application is started and has created its internal topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8231) Expansion of ConnectClusterState interface
[ https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818254#comment-16818254 ] ASF GitHub Bot commented on KAFKA-8231: --- C0urante commented on pull request #6584: KAFKA-8231: Expansion of ConnectClusterState interface URL: https://github.com/apache/kafka/pull/6584 [Jira](https://issues.apache.org/jira/browse/KAFKA-8231) and [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface) The changes here add new methods to the `ConnectClusterState` interface so that Connect REST extensions can be more aware of the current state of the Connect cluster they are added to. The new methods allow extensions to query for connector and task configurations, as well as the ID of the Kafka cluster targeted by the Connect cluster. All new methods have new unit tests added for their implementations in the `ConnectClusterStateImpl` class. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Expansion of ConnectClusterState interface > -- > > Key: KAFKA-8231 > URL: https://issues.apache.org/jira/browse/KAFKA-8231 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.0 > > > This covers [KIP-454: Expansion of the ConnectClusterState > interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset
Boyang Chen created KAFKA-8236: -- Summary: Incorporate version control for Kafka Streams Application Reset Key: KAFKA-8236 URL: https://issues.apache.org/jira/browse/KAFKA-8236 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Inspired by Spark mlflow which supports versioning log, we should be considering expose a special versioning tag for KStream applications to easy rollback bad code deploy. The naive approach is to store the versioning info in consumer offset topic so that when we perform rollback, we know where to read from the input, and where to cleanup the changelog topic. Essentially, this is an extension to our current application reset tool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818083#comment-16818083 ] evildracula commented on KAFKA-2729: Hello [~junrao], I'm now using 0.11.0.3 which is in affected versions. I would like to reproduce this issue in my DEV environment. Could you please help to provide reproduce steps? Many thanks. I'm now reproducing by **systemctl start/stop iptables** but failed. > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0 >Reporter: Danil Serdyuchenko >Assignee: Onur Karaman >Priority: Major > Fix For: 1.1.0 > > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jukka Karvanen updated KAFKA-8233: -- Description: When using TopologyTestDriver you need to call ConsumerRecordFactory to create ConsumerRecord passed into pipeInput method to write to topic. Also when calling readOutput to consume from topic, you need to provide correct Deserializers each time. You easily end up writing helper methods in your test classed, but this can be avoided when adding generic input and output topic classes. This improvement adds TestInputTopic class which wraps TopologyTestDriver and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver reading methods and provide typesafe read methods. More info and an example of how Stream test looks after using this classes: [https://github.com/jukkakarvanen/kafka-streams-test-topics] was: When using TopologyTestDriver you need to call ConsumerRecordFactory to create ConsumerRecord passed into pipeInput method to write to topic. Also when calling readOutput to consume from topic, you need to provide correct Deserializers each time. You easily end up writing helper methods in your test classed, but this can be avoided when adding generic input and output topic classes. This improvement adds TestInputTopic class which wraps TopologyTestDriver and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver reading methods and provide typesafe read methods. Example of how Stream test looks after using this classes: [https://github.com/jukkakarvanen/kafka-streams-examples/blob/InputOutputTopic/src/test/java/io/confluent/examples/streams/WordCountLambdaExampleTest.java] > Helper class to make it simpler to write test logic with TopologyTestDriver > --- > > Key: KAFKA-8233 > URL: https://issues.apache.org/jira/browse/KAFKA-8233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jukka Karvanen >Priority: Minor > > When using TopologyTestDriver you need to call ConsumerRecordFactory to > create ConsumerRecord passed into pipeInput method to write to topic. Also > when calling readOutput to consume from topic, you need to provide correct > Deserializers each time. > You easily end up writing helper methods in your test classed, but this can > be avoided when adding generic input and output topic classes. > This improvement adds TestInputTopic class which wraps TopologyTestDriver > and ConsumerRecordFactory methods as one class to be used to write to Input > Topics and TestOutputTopic class which collects TopologyTestDriver reading > methods and provide typesafe read methods. > > More info and an example of how Stream test looks after using this classes: > [https://github.com/jukkakarvanen/kafka-streams-test-topics] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8100) kafka consumer not refresh metadata for dynamic topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengnan YU updated KAFKA-8100: --- Summary: kafka consumer not refresh metadata for dynamic topic deletion (was: If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log) > kafka consumer not refresh metadata for dynamic topic deletion > -- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1, 2.1.1 >Reporter: Shengnan YU >Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "6"); > KafkaConsumer consumer = new KafkaConsumer String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection > partitions) { > } > public void onPartitionsAssigned(Collection > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log
[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817951#comment-16817951 ] Shengnan YU commented on KAFKA-8100: Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if the topic actually exists. > If delete expired topic, kafka consumer will keep flushing unknown_topic > warning in log > --- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1, 2.1.1 >Reporter: Shengnan YU >Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "6"); > KafkaConsumer consumer = new KafkaConsumer String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection > partitions) { > } > public void onPartitionsAssigned(Collection > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log
[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817951#comment-16817951 ] Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM: - Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered again if the topic actually exists. was (Author: ysn2233): Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if the topic actually exists. > If delete expired topic, kafka consumer will keep flushing unknown_topic > warning in log > --- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1, 2.1.1 >Reporter: Shengnan YU >Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "6"); > KafkaConsumer consumer = new KafkaConsumer String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection > partitions) { > } > public void onPartitionsAssigned(Collection > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log
[ https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817951#comment-16817951 ] Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM: - Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if the topic actually exists. was (Author: ysn2233): Hi could you please explain why not delete this topic info in metadata first when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if the topic actually exists. > If delete expired topic, kafka consumer will keep flushing unknown_topic > warning in log > --- > > Key: KAFKA-8100 > URL: https://issues.apache.org/jira/browse/KAFKA-8100 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1, 2.1.1 >Reporter: Shengnan YU >Priority: Major > > Recently we used flink to consume kafka topics with a regex pattern. It is > found that when we deleted some unused topics, the logs will keep flushing > UNKNOWN_TOPIC_EXCEPTION. > I study the source code of kafka client, it is found that for consumer, > topicExpiry is disable in Metadata, which leads to that the even the topic > deleted, the client still have this topic info in the metadata's topic list > and keep fetching from servers. > Is there any good method to avoid this annoying warning logs without modify > the kafka's source code? (We still need the 'Real' Unknown topic exception, > which means not the outdated topic, in logs) > The following code can be used to reproduce this problem (if you create > multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster > and then delete any of one while running). > {code:java} > public static void main(String [] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092\n"); > props.put("group.id", "test10"); > props.put("enable.auto.commit", "true"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.max.age.ms", "6"); > KafkaConsumer consumer = new KafkaConsumer String>(props); > class PartitionOffsetAssignerListener implements > ConsumerRebalanceListener { > private KafkaConsumer consumer; > public PartitionOffsetAssignerListener(KafkaConsumer > kafkaConsumer) { > this.consumer = kafkaConsumer; > } > public void onPartitionsRevoked(Collection > partitions) { > } > public void onPartitionsAssigned(Collection > partitions) { > //reading all partitions from the beginning > consumer.seekToBeginning(partitions); > } > } > consumer.subscribe(Pattern.compile("^test.*$"), new > PartitionOffsetAssignerListener(consumer)); > while (true) { > ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > System.out.printf("offset = %d, key = %s, value = %s%n", > record.offset(), record.key(), record.value()); > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8234) Multi-module support for JAAS config property
[ https://issues.apache.org/jira/browse/KAFKA-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817935#comment-16817935 ] Gabor Somogyi commented on KAFKA-8234: -- cc [~viktorsomogyi] > Multi-module support for JAAS config property > - > > Key: KAFKA-8234 > URL: https://issues.apache.org/jira/browse/KAFKA-8234 > Project: Kafka > Issue Type: Improvement >Reporter: Gabor Somogyi >Priority: Major > > I've tried to add multi-modules to JAAS config property but its not supported > at the moment: > {code:java} > Exception in thread "main" org.apache.kafka.common.KafkaException: Failed > create new KafkaAdminClient > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370) > at > org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52) > at > com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96) > at > com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala) > Caused by: java.lang.IllegalArgumentException: JAAS config property contains > 2 login modules, should be 1 module > at > org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95) > at > org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119) > at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65) > at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88) > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346) > ... 3 more > {code} > I wanted to implement a fallback scenario with sufficient LoginModule flag > but the missing multi-module support makes in impossible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8234) Multi-module support for JAAS config property
[ https://issues.apache.org/jira/browse/KAFKA-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi updated KAFKA-8234: - Affects Version/s: 2.2.0 > Multi-module support for JAAS config property > - > > Key: KAFKA-8234 > URL: https://issues.apache.org/jira/browse/KAFKA-8234 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.2.0 >Reporter: Gabor Somogyi >Priority: Major > > I've tried to add multi-modules to JAAS config property but its not supported > at the moment: > {code:java} > Exception in thread "main" org.apache.kafka.common.KafkaException: Failed > create new KafkaAdminClient > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370) > at > org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52) > at > com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96) > at > com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala) > Caused by: java.lang.IllegalArgumentException: JAAS config property contains > 2 login modules, should be 1 module > at > org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95) > at > org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119) > at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65) > at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88) > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346) > ... 3 more > {code} > I wanted to implement a fallback scenario with sufficient LoginModule flag > but the missing multi-module support makes in impossible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8235) NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application
Andrew Klopper created KAFKA-8235: - Summary: NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application Key: KAFKA-8235 URL: https://issues.apache.org/jira/browse/KAFKA-8235 Project: Kafka Issue Type: Bug Affects Versions: 2.3.0 Environment: Linux, CentOS 7, Java 1.8.0_191, 50 partitions per topic, replication factor 3 Reporter: Andrew Klopper While performing a larger scale test of a new Kafka Streams application that performs aggregation and suppression, we have discovered that we are unable to restart the application after a clean shutdown. The error that is logged is: {code:java} [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] Encountered the following error during processing: java.util.NoSuchElementException at java.util.TreeMap.key(TreeMap.java:1327) at java.util.TreeMap.firstKey(TreeMap.java:290) at org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288) at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) at org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773) {code} The issue doesn't seem to occur for small amounts of data, but it doesn't take a particularly large amount of data to trigger the problem either. Any assistance would be greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8234) Multi-module support for JAAS config property
Gabor Somogyi created KAFKA-8234: Summary: Multi-module support for JAAS config property Key: KAFKA-8234 URL: https://issues.apache.org/jira/browse/KAFKA-8234 Project: Kafka Issue Type: Improvement Reporter: Gabor Somogyi I've tried to add multi-modules to JAAS config property but its not supported at the moment: {code:java} Exception in thread "main" org.apache.kafka.common.KafkaException: Failed create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52) at com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96) at com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala) Caused by: java.lang.IllegalArgumentException: JAAS config property contains 2 login modules, should be 1 module at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95) at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346) ... 3 more {code} I wanted to implement a fallback scenario with sufficient LoginModule flag but the missing multi-module support makes in impossible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8232) Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion
[ https://issues.apache.org/jira/browse/KAFKA-8232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-8232. --- Resolution: Fixed Reviewer: Manikumar Fix Version/s: 2.2.1 > Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion > > > Key: KAFKA-8232 > URL: https://issues.apache.org/jira/browse/KAFKA-8232 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.3.0, 2.2.1 > > > {code:java} > java.lang.AssertionError: Delete path for topic should exist after deletion. > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion(TopicCommandWithAdminClientTest.scala:471){code} > The verification doesn't look safe since the delete path could have been > removed before the test verifies it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jukka Karvanen updated KAFKA-8233: -- Summary: Helper class to make it simpler to write test logic with TopologyTestDriver (was: Helper class to make it simpler to write test logic TopologyTestDriver) > Helper class to make it simpler to write test logic with TopologyTestDriver > --- > > Key: KAFKA-8233 > URL: https://issues.apache.org/jira/browse/KAFKA-8233 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jukka Karvanen >Priority: Minor > > When using TopologyTestDriver you need to call ConsumerRecordFactory to > create ConsumerRecord passed into pipeInput method to write to topic. Also > when calling readOutput to consume from topic, you need to provide correct > Deserializers each time. > You easily end up writing helper methods in your test classed, but this can > be avoided when adding generic input and output topic classes. > This improvement adds TestInputTopic class which wraps TopologyTestDriver > and ConsumerRecordFactory methods as one class to be used to write to Input > Topics and TestOutputTopic class which collects TopologyTestDriver reading > methods and provide typesafe read methods. > > Example of how Stream test looks after using this classes: > [https://github.com/jukkakarvanen/kafka-streams-examples/blob/InputOutputTopic/src/test/java/io/confluent/examples/streams/WordCountLambdaExampleTest.java] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8233) Helper class to make it simpler to write test logic TopologyTestDriver
Jukka Karvanen created KAFKA-8233: - Summary: Helper class to make it simpler to write test logic TopologyTestDriver Key: KAFKA-8233 URL: https://issues.apache.org/jira/browse/KAFKA-8233 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jukka Karvanen When using TopologyTestDriver you need to call ConsumerRecordFactory to create ConsumerRecord passed into pipeInput method to write to topic. Also when calling readOutput to consume from topic, you need to provide correct Deserializers each time. You easily end up writing helper methods in your test classed, but this can be avoided when adding generic input and output topic classes. This improvement adds TestInputTopic class which wraps TopologyTestDriver and ConsumerRecordFactory methods as one class to be used to write to Input Topics and TestOutputTopic class which collects TopologyTestDriver reading methods and provide typesafe read methods. Example of how Stream test looks after using this classes: [https://github.com/jukkakarvanen/kafka-streams-examples/blob/InputOutputTopic/src/test/java/io/confluent/examples/streams/WordCountLambdaExampleTest.java] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8232) Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion
[ https://issues.apache.org/jira/browse/KAFKA-8232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817885#comment-16817885 ] ASF GitHub Bot commented on KAFKA-8232: --- rajinisivaram commented on pull request #6581: KAFKA-8232; Test topic delete completion rather than intermediate state URL: https://github.com/apache/kafka/pull/6581 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion > > > Key: KAFKA-8232 > URL: https://issues.apache.org/jira/browse/KAFKA-8232 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.3.0 > > > {code:java} > java.lang.AssertionError: Delete path for topic should exist after deletion. > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion(TopicCommandWithAdminClientTest.scala:471){code} > The verification doesn't look safe since the delete path could have been > removed before the test verifies it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8207) StickyPartitionAssignor for KStream
[ https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817625#comment-16817625 ] neeraj edited comment on KAFKA-8207 at 4/15/19 9:00 AM: [~mjsax] Thanks for explaining, because of new partitions assignment to node 1 when it comes up, in production with heave state store it will take time for state store to be updated for new partitions. was (Author: neeraj.bhatt): [~mjsax] Thanks for explaining. Any idea when 2.3 is expected? also kindly assign me a bug related to this KIP if it helps in early release. > StickyPartitionAssignor for KStream > --- > > Key: KAFKA-8207 > URL: https://issues.apache.org/jira/browse/KAFKA-8207 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: neeraj >Priority: Major > > In KStreams I am not able to give a sticky partition assignor or my custom > partition assignor. > Overriding the property while building stream does not work > streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > CustomAssignor.class.getName()); > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8207) StickyPartitionAssignor for KStream
[ https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817625#comment-16817625 ] neeraj commented on KAFKA-8207: --- [~mjsax] Thanks for explaining. Any idea when 2.3 is expected? also kindly assign me a bug related to this KIP if it helps in early release. > StickyPartitionAssignor for KStream > --- > > Key: KAFKA-8207 > URL: https://issues.apache.org/jira/browse/KAFKA-8207 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: neeraj >Priority: Major > > In KStreams I am not able to give a sticky partition assignor or my custom > partition assignor. > Overriding the property while building stream does not work > streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > CustomAssignor.class.getName()); > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8226) New MirrorMaker option partition.to.partition
[ https://issues.apache.org/jira/browse/KAFKA-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817610#comment-16817610 ] Sönke Liebau edited comment on KAFKA-8226 at 4/15/19 7:14 AM: -- Hi [~ernisv], the functionality that you are implementing is definitely useful, however it can also easily be achieved by implementing a custom MessageHandler that preserves partitioning. Personally I think we should try to avoid adding too many features for which extensions point were already designed in Kafka. Additionally there is [MirrorMaker 2.0|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]] coming up, which I believe has this functionality built in. was (Author: sliebau): Hi [~ernisv], the functionality that you are implementing is definitely useful, however it can also easily be achieved by implementing a custom MessageHandler that preserves partitioning. Personally I think we should try to avoid adding too many features for which extensions point were already designed in Kafka. Additionally there is [MirrorMaker 2.0|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]] coming up, which I believe has this functionality built in. > New MirrorMaker option partition.to.partition > - > > Key: KAFKA-8226 > URL: https://issues.apache.org/jira/browse/KAFKA-8226 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Ernestas Vaiciukevičius >Priority: Major > > Currently when MirrorMaker moves data between topics with records with null > keys - it shuffles records between destination topic's partitions. Sometimes > it's desirable to try preserving the original partition. > Related PR adds new command line option to do that: > When partition.to.partition=true MirrorMaker retains the partition number > when mirroring records even without the keys. > When using this option - source and destination topics are assumed to have > the same number of partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8226) New MirrorMaker option partition.to.partition
[ https://issues.apache.org/jira/browse/KAFKA-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817610#comment-16817610 ] Sönke Liebau edited comment on KAFKA-8226 at 4/15/19 7:14 AM: -- Hi [~ernisv], the functionality that you are implementing is definitely useful, however it can also easily be achieved by implementing a custom MessageHandler that preserves partitioning. Personally I think we should try to avoid adding too many features for which extensions point were already designed in Kafka. Additionally there is [MirrorMaker 2.0|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions] coming up, which I believe has this functionality built in. was (Author: sliebau): Hi [~ernisv], the functionality that you are implementing is definitely useful, however it can also easily be achieved by implementing a custom MessageHandler that preserves partitioning. Personally I think we should try to avoid adding too many features for which extensions point were already designed in Kafka. Additionally there is [MirrorMaker 2.0|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]] coming up, which I believe has this functionality built in. > New MirrorMaker option partition.to.partition > - > > Key: KAFKA-8226 > URL: https://issues.apache.org/jira/browse/KAFKA-8226 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Ernestas Vaiciukevičius >Priority: Major > > Currently when MirrorMaker moves data between topics with records with null > keys - it shuffles records between destination topic's partitions. Sometimes > it's desirable to try preserving the original partition. > Related PR adds new command line option to do that: > When partition.to.partition=true MirrorMaker retains the partition number > when mirroring records even without the keys. > When using this option - source and destination topics are assumed to have > the same number of partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8226) New MirrorMaker option partition.to.partition
[ https://issues.apache.org/jira/browse/KAFKA-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817610#comment-16817610 ] Sönke Liebau commented on KAFKA-8226: - Hi [~ernisv], the functionality that you are implementing is definitely useful, however it can also easily be achieved by implementing a custom MessageHandler that preserves partitioning. Personally I think we should try to avoid adding too many features for which extensions point were already designed in Kafka. Additionally there is [MirrorMaker 2.0|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]] coming up, which I believe has this functionality built in. > New MirrorMaker option partition.to.partition > - > > Key: KAFKA-8226 > URL: https://issues.apache.org/jira/browse/KAFKA-8226 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Ernestas Vaiciukevičius >Priority: Major > > Currently when MirrorMaker moves data between topics with records with null > keys - it shuffles records between destination topic's partitions. Sometimes > it's desirable to try preserving the original partition. > Related PR adds new command line option to do that: > When partition.to.partition=true MirrorMaker retains the partition number > when mirroring records even without the keys. > When using this option - source and destination topics are assumed to have > the same number of partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)