[jira] [Created] (KAFKA-8238) Log how many bytes and messages were read from __consumer_offsets

2019-04-15 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-8238:
--

 Summary: Log how many bytes and messages were read from 
__consumer_offsets
 Key: KAFKA-8238
 URL: https://issues.apache.org/jira/browse/KAFKA-8238
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin P. McCabe


We should log how many bytes and messages were read from __consumer_offsets.  
Currently we only log how long it took.  Example: 
{code}
[GroupMetadataManager brokerId=2] Finished loading offsets and group metadata 
from __consumer_offsets-22 in 23131 milliseconds.
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8238) Log how many bytes and messages were read from __consumer_offsets

2019-04-15 Thread Colin P. McCabe (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-8238:
---
Labels: newbie  (was: )

> Log how many bytes and messages were read from __consumer_offsets
> -
>
> Key: KAFKA-8238
> URL: https://issues.apache.org/jira/browse/KAFKA-8238
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Priority: Minor
>  Labels: newbie
>
> We should log how many bytes and messages were read from __consumer_offsets.  
> Currently we only log how long it took.  Example: 
> {code}
> [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata 
> from __consumer_offsets-22 in 23131 milliseconds.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application

2019-04-15 Thread Boquan Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boquan Tang resolved KAFKA-8228.

Resolution: Duplicate

This might duplicate KAFKA-7866, close for now and watch that ticket.

> Exactly once semantics break during server restart for kafka-streams 
> application
> 
>
> Key: KAFKA-8228
> URL: https://issues.apache.org/jira/browse/KAFKA-8228
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
>
> We are using 2.2.0 for kafka-streams client and 2.0.1 for server.
> We have a simple kafka-streams application that has the following topology:
> {code:java}
> Source: KSTREAM-SOURCE-04 (topics: [deduped-adclick]) 
> --> KSTREAM-TRANSFORM-05 
> Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store]) 
> --> KSTREAM-TRANSFORM-06 
> <-- KSTREAM-SOURCE-04 
> Source: KSTREAM-SOURCE-00 (topics: [advertiser-budget]) 
> --> KTABLE-SOURCE-01 
> Source: KSTREAM-SOURCE-02 (topics: [advertisement-budget]) 
> --> KTABLE-SOURCE-03 
> Processor: KSTREAM-TRANSFORM-06 (stores: [advertiser-budget-store, 
> advertisement-budget-store]) 
> --> KSTREAM-SINK-07 
> <-- KSTREAM-TRANSFORM-05 
> Sink: KSTREAM-SINK-07 (topic: budget-adclick) 
> <-- KSTREAM-TRANSFORM-06 
> Processor: KTABLE-SOURCE-01 (stores: [advertiser-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-00 
> Processor: KTABLE-SOURCE-03 (stores: [advertisement-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-02{code}
> The *Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store])* is 
> added additionally to investigate this EOS broken issue, and its transform() 
> is like this (specific K V class name is removed):
> {code:java}
> public void init(final ProcessorContext context) {
> uidStore = (WindowStore) 
> context.getStateStore(uidStoreName);
> this.context = context;
> }
> public KeyValue transform(final K key, final V value) {
> final long offset = context.offset();
> final String uid = value.getUid();
> final long beginningOfHour = 
> Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond()
>  * 1000;
> final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
> final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
> uidStore.put(uid, offset, beginningOfHour);
> if (dupe) {
> LOGGER.warn("Find duplication in partition {}, uid is {}, current 
> offset is {}, last offset is {}",
> context.partition(),
> uid,
> offset,
> maybeLastOffset);
> statsEmitter.count("duplication");
> }
> return dupe ? null : new KeyValue<>(key, value);
> }
> {code}
> Although not 100% reproduce-able, we found that after we restart one or more 
> server on the cluster side, the duplication would be found:
> {code:java}
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer,
>  transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:27:39Z WARN 
> [org.apache.kafka.streams.processor.internals.StreamThread] 
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  stream-thread 
> 

[jira] [Commented] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application

2019-04-15 Thread Boquan Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818492#comment-16818492
 ] 

Boquan Tang commented on KAFKA-8228:


[~mjsax] Gotcha. I'll watch that issue, if I found new evidence to point this 
issue to another root cause I'll reopen and add more details.

> Exactly once semantics break during server restart for kafka-streams 
> application
> 
>
> Key: KAFKA-8228
> URL: https://issues.apache.org/jira/browse/KAFKA-8228
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
>
> We are using 2.2.0 for kafka-streams client and 2.0.1 for server.
> We have a simple kafka-streams application that has the following topology:
> {code:java}
> Source: KSTREAM-SOURCE-04 (topics: [deduped-adclick]) 
> --> KSTREAM-TRANSFORM-05 
> Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store]) 
> --> KSTREAM-TRANSFORM-06 
> <-- KSTREAM-SOURCE-04 
> Source: KSTREAM-SOURCE-00 (topics: [advertiser-budget]) 
> --> KTABLE-SOURCE-01 
> Source: KSTREAM-SOURCE-02 (topics: [advertisement-budget]) 
> --> KTABLE-SOURCE-03 
> Processor: KSTREAM-TRANSFORM-06 (stores: [advertiser-budget-store, 
> advertisement-budget-store]) 
> --> KSTREAM-SINK-07 
> <-- KSTREAM-TRANSFORM-05 
> Sink: KSTREAM-SINK-07 (topic: budget-adclick) 
> <-- KSTREAM-TRANSFORM-06 
> Processor: KTABLE-SOURCE-01 (stores: [advertiser-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-00 
> Processor: KTABLE-SOURCE-03 (stores: [advertisement-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-02{code}
> The *Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store])* is 
> added additionally to investigate this EOS broken issue, and its transform() 
> is like this (specific K V class name is removed):
> {code:java}
> public void init(final ProcessorContext context) {
> uidStore = (WindowStore) 
> context.getStateStore(uidStoreName);
> this.context = context;
> }
> public KeyValue transform(final K key, final V value) {
> final long offset = context.offset();
> final String uid = value.getUid();
> final long beginningOfHour = 
> Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond()
>  * 1000;
> final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
> final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
> uidStore.put(uid, offset, beginningOfHour);
> if (dupe) {
> LOGGER.warn("Find duplication in partition {}, uid is {}, current 
> offset is {}, last offset is {}",
> context.partition(),
> uid,
> offset,
> maybeLastOffset);
> statsEmitter.count("duplication");
> }
> return dupe ? null : new KeyValue<>(key, value);
> }
> {code}
> Although not 100% reproduce-able, we found that after we restart one or more 
> server on the cluster side, the duplication would be found:
> {code:java}
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer,
>  transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:27:39Z WARN 
> [org.apache.kafka.streams.processor.internals.StreamThread] 
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  stream-thread 
> 

[jira] [Commented] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset

2019-04-15 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818487#comment-16818487
 ] 

Boyang Chen commented on KAFKA-8236:


[~mjsax] Right, this is just a preliminary thought. One way we could do this is 
by exposing a configuration like StreamsConfig.CODE_VERSION, such that user 
could inject it. However, we need to handle the A -> B -> A problem, where user 
could choose to rollback and rollforward multiple times. Adding a timestamp 
suffix might be a good fix to distinguish two different builds.

> Incorporate version control for Kafka Streams Application Reset
> ---
>
> Key: KAFKA-8236
> URL: https://issues.apache.org/jira/browse/KAFKA-8236
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: needs-kip
>
> Inspired by Spark mlflow which supports versioning log, we should be 
> considering expose a special versioning tag for KStream applications to easy 
> rollback bad code deploy. The naive approach is to store the versioning info 
> in consumer offset topic so that when we perform rollback, we know where to 
> read from the input, and where to cleanup the changelog topic. Essentially, 
> this is an extension to our current application reset tool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7471) Multiple Consumer Group Management (Describe, Reset, Delete)

2019-04-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818473#comment-16818473
 ] 

ASF GitHub Bot commented on KAFKA-7471:
---

vahidhashemian commented on pull request #5726: KAFKA-7471: Multiple Consumer 
Group Management Feature
URL: https://github.com/apache/kafka/pull/5726
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Multiple Consumer Group Management (Describe, Reset, Delete)
> 
>
> Key: KAFKA-7471
> URL: https://issues.apache.org/jira/browse/KAFKA-7471
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Affects Versions: 1.0.0, 2.0.0
>Reporter: Alex Dunayevsky
>Assignee: Alex Dunayevsky
>Priority: Major
>
> Functionality needed:
>  * Describe/Delete/Reset offsets on multiple consumer groups at a time 
> (including each group by repeating `--group` parameter)
>  * Describe/Delete/Reset offsets on ALL consumer groups at a time (add new 
> --groups-all option similar to --topics-all)
>  * Generate CSV for multiple consumer groups
> What are the benifits? 
>  * No need to start a new JVM to perform each query on every single consumer 
> group
>  * Abiltity to query groups by their status (for instance, `-v grepping` by 
> `Stable` to spot problematic/dead/empty groups)
>  * Ability to export offsets to reset for multiple consumer groups to a CSV 
> file (needs CSV generation export/import format rework)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8228) Exactly once semantics break during server restart for kafka-streams application

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818474#comment-16818474
 ] 

Matthias J. Sax commented on KAFKA-8228:


[~boquan] Can you maybe follow up on KAFKA-7866 ? Maybe we close this ticket as 
"contained in".

> Exactly once semantics break during server restart for kafka-streams 
> application
> 
>
> Key: KAFKA-8228
> URL: https://issues.apache.org/jira/browse/KAFKA-8228
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
>
> We are using 2.2.0 for kafka-streams client and 2.0.1 for server.
> We have a simple kafka-streams application that has the following topology:
> {code:java}
> Source: KSTREAM-SOURCE-04 (topics: [deduped-adclick]) 
> --> KSTREAM-TRANSFORM-05 
> Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store]) 
> --> KSTREAM-TRANSFORM-06 
> <-- KSTREAM-SOURCE-04 
> Source: KSTREAM-SOURCE-00 (topics: [advertiser-budget]) 
> --> KTABLE-SOURCE-01 
> Source: KSTREAM-SOURCE-02 (topics: [advertisement-budget]) 
> --> KTABLE-SOURCE-03 
> Processor: KSTREAM-TRANSFORM-06 (stores: [advertiser-budget-store, 
> advertisement-budget-store]) 
> --> KSTREAM-SINK-07 
> <-- KSTREAM-TRANSFORM-05 
> Sink: KSTREAM-SINK-07 (topic: budget-adclick) 
> <-- KSTREAM-TRANSFORM-06 
> Processor: KTABLE-SOURCE-01 (stores: [advertiser-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-00 
> Processor: KTABLE-SOURCE-03 (stores: [advertisement-budget-store]) 
> --> none 
> <-- KSTREAM-SOURCE-02{code}
> The *Processor: KSTREAM-TRANSFORM-05 (stores: [uid-offset-store])* is 
> added additionally to investigate this EOS broken issue, and its transform() 
> is like this (specific K V class name is removed):
> {code:java}
> public void init(final ProcessorContext context) {
> uidStore = (WindowStore) 
> context.getStateStore(uidStoreName);
> this.context = context;
> }
> public KeyValue transform(final K key, final V value) {
> final long offset = context.offset();
> final String uid = value.getUid();
> final long beginningOfHour = 
> Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond()
>  * 1000;
> final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
> final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
> uidStore.put(uid, offset, beginningOfHour);
> if (dupe) {
> LOGGER.warn("Find duplication in partition {}, uid is {}, current 
> offset is {}, last offset is {}",
> context.partition(),
> uid,
> offset,
> maybeLastOffset);
> statsEmitter.count("duplication");
> }
> return dupe ? null : new KeyValue<>(key, value);
> }
> {code}
> Although not 100% reproduce-able, we found that after we restart one or more 
> server on the cluster side, the duplication would be found:
> {code:java}
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
>  transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient] 
> [kafka-producer-network-thread | 
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer]
>  [Producer 
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer,
>  transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2 
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:27:39Z WARN 
> [org.apache.kafka.streams.processor.internals.StreamThread] 
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
>  stream-thread 
> 

[jira] [Commented] (KAFKA-8207) StickyPartitionAssignor for KStream

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818471#comment-16818471
 ] 

Matthias J. Sax commented on KAFKA-8207:


I understand. However, this must be fixed differently. Not by allowing users to 
specify a custom partition assignor. Hence, I think we should close this ticket 
as "not a problem".

> StickyPartitionAssignor for KStream
> ---
>
> Key: KAFKA-8207
> URL: https://issues.apache.org/jira/browse/KAFKA-8207
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: neeraj
>Priority: Major
>
> In KStreams I am not able to give a sticky partition assignor or my custom 
> partition assignor.
> Overriding the property while building stream does not work
> streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CustomAssignor.class.getName());
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7778) Add KTable.suppress to Scala API

2019-04-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818464#comment-16818464
 ] 

ASF GitHub Bot commented on KAFKA-7778:
---

guozhangwang commented on pull request #6314: KAFKA-7778: Add KTable.suppress 
to Scala API
URL: https://github.com/apache/kafka/pull/6314
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add KTable.suppress to Scala API
> 
>
> Key: KAFKA-7778
> URL: https://issues.apache.org/jira/browse/KAFKA-7778
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Jacek Laskowski
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> {{KTable.suppress}} is not available in Scala API.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"

2019-04-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8198:
--

Assignee: Victoria Bialas

> KStreams testing docs use non-existent method "pipe"
> 
>
> Key: KAFKA-8198
> URL: https://issues.apache.org/jira/browse/KAFKA-8198
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1
>Reporter: Michael Drogalis
>Assignee: Victoria Bialas
>Priority: Minor
>  Labels: documentation, newbie
>
> In [the testing docs for 
> KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
>  we use the following code snippet:
> {code:java}
> ConsumerRecordFactory factory = new 
> ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
> IntegerSerializer());
> testDriver.pipe(factory.create("key", 42L));
> {code}
> We should correct the docs to use the pipeInput method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"

2019-04-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8198:
---
Description: 
In [the testing docs for 
KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
 we use the following code snippet:
{code:java}
ConsumerRecordFactory factory = new 
ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
{code}
We should correct the docs to use the pipeInput method.

  was:
In [the testing docs for 
KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
 we use the following code snippet:

{code:java}
ConsumerRecordFactory factory = new 
ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
{code}

As of Apache Kafka 2.2.0, this method no longer exists. We should correct the 
docs to use the pipeInput method.



> KStreams testing docs use non-existent method "pipe"
> 
>
> Key: KAFKA-8198
> URL: https://issues.apache.org/jira/browse/KAFKA-8198
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 1.1.1, 2.0.1, 2.2.0, 2.1.1
>Reporter: Michael Drogalis
>Priority: Minor
>  Labels: documentation, newbie
>
> In [the testing docs for 
> KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
>  we use the following code snippet:
> {code:java}
> ConsumerRecordFactory factory = new 
> ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
> IntegerSerializer());
> testDriver.pipe(factory.create("key", 42L));
> {code}
> We should correct the docs to use the pipeInput method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6635) Producer close does not await pending transaction

2019-04-15 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6635:
---
Fix Version/s: 2.3.0

> Producer close does not await pending transaction
> -
>
> Key: KAFKA-6635
> URL: https://issues.apache.org/jira/browse/KAFKA-6635
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently close() only awaits completion of pending produce requests. If 
> there is a transaction ongoing, it may be dropped. For example, if one thread 
> is calling {{commitTransaction()}} and another calls {{close()}}, then the 
> commit may never happen even if the caller is willing to wait for it (by 
> using a long timeout). What's more, the thread blocking in 
> {{commitTransaction()}} will be stuck since the result will not be completed 
> once the producer has shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6635) Producer close does not await pending transaction

2019-04-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818449#comment-16818449
 ] 

ASF GitHub Bot commented on KAFKA-6635:
---

hachikuji commented on pull request #5971: KAFKA-6635: Producer close awaits 
for pending transaction
URL: https://github.com/apache/kafka/pull/5971
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Producer close does not await pending transaction
> -
>
> Key: KAFKA-6635
> URL: https://issues.apache.org/jira/browse/KAFKA-6635
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Currently close() only awaits completion of pending produce requests. If 
> there is a transaction ongoing, it may be dropped. For example, if one thread 
> is calling {{commitTransaction()}} and another calls {{close()}}, then the 
> commit may never happen even if the caller is willing to wait for it (by 
> using a long timeout). What's more, the thread blocking in 
> {{commitTransaction()}} will be stuck since the result will not be completed 
> once the producer has shutdown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818443#comment-16818443
 ] 

Matthias J. Sax commented on KAFKA-8236:


Not 100% sure if this needs a KIP.

> Incorporate version control for Kafka Streams Application Reset
> ---
>
> Key: KAFKA-8236
> URL: https://issues.apache.org/jira/browse/KAFKA-8236
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: needs-kip
>
> Inspired by Spark mlflow which supports versioning log, we should be 
> considering expose a special versioning tag for KStream applications to easy 
> rollback bad code deploy. The naive approach is to store the versioning info 
> in consumer offset topic so that when we perform rollback, we know where to 
> read from the input, and where to cleanup the changelog topic. Essentially, 
> this is an extension to our current application reset tool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset

2019-04-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8236:
---
Component/s: tools
 streams

> Incorporate version control for Kafka Streams Application Reset
> ---
>
> Key: KAFKA-8236
> URL: https://issues.apache.org/jira/browse/KAFKA-8236
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Priority: Minor
>
> Inspired by Spark mlflow which supports versioning log, we should be 
> considering expose a special versioning tag for KStream applications to easy 
> rollback bad code deploy. The naive approach is to store the versioning info 
> in consumer offset topic so that when we perform rollback, we know where to 
> read from the input, and where to cleanup the changelog topic. Essentially, 
> this is an extension to our current application reset tool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset

2019-04-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8236:
---
Labels: needs-kip  (was: )

> Incorporate version control for Kafka Streams Application Reset
> ---
>
> Key: KAFKA-8236
> URL: https://issues.apache.org/jira/browse/KAFKA-8236
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: needs-kip
>
> Inspired by Spark mlflow which supports versioning log, we should be 
> considering expose a special versioning tag for KStream applications to easy 
> rollback bad code deploy. The naive approach is to store the versioning info 
> in consumer offset topic so that when we perform rollback, we know where to 
> read from the input, and where to cleanup the changelog topic. Essentially, 
> this is an extension to our current application reset tool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8235) NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818442#comment-16818442
 ] 

Matthias J. Sax edited comment on KAFKA-8235 at 4/15/19 10:30 PM:
--

Thanks for reporting this. I have issue to map the stack trace to the code.
 at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288)
 The method 

private void restoreBatch(final Collection> 
batch)

seems to be in a different line... and I can also not find where
 TreeMap.firstKey
 is called.

Kafka 2.3 is not released yet, thus "affected version" field seems not be 
correctly specified. What version are you using?
{quote}The issue doesn't seem to occur for small amounts of data, but it 
doesn't take a particularly large amount of data to trigger the problem either.
{quote}
Can you specify some numbers?


was (Author: mjsax):
Thanks for reporting this. I have issue to map the stack trace to the code.
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288)
The method 

private void restoreBatch(final Collection> 
batch)

seems to be in a different line... and I can also not find where
TreeMap.firstKey
is called.

Kafka 2.3 is not released yet, thus "affected version" field seems not be 
correctly specified. What version are you using?

> NoSuchElementException when restoring state after a clean shutdown of a Kafka 
> Streams application
> -
>
> Key: KAFKA-8235
> URL: https://issues.apache.org/jira/browse/KAFKA-8235
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
> Environment: Linux, CentOS 7, Java 1.8.0_191, 50 partitions per 
> topic, replication factor 3
>Reporter: Andrew Klopper
>Priority: Major
>
> While performing a larger scale test of a new Kafka Streams application that 
> performs aggregation and suppression, we have discovered that we are unable 
> to restart the application after a clean shutdown. The error that is logged 
> is:
> {code:java}
> [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] Encountered the 
> following error during processing:
> java.util.NoSuchElementException
> at java.util.TreeMap.key(TreeMap.java:1327)
> at java.util.TreeMap.firstKey(TreeMap.java:290)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288)
> at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
> at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> {code}
> The issue doesn't seem to occur for small amounts of data, but it doesn't 
> take a particularly large amount of data to trigger the problem either.
> Any assistance would be greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8235) NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818442#comment-16818442
 ] 

Matthias J. Sax commented on KAFKA-8235:


Thanks for reporting this. I have issue to map the stack trace to the code.
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288)
The method 

private void restoreBatch(final Collection> 
batch)

seems to be in a different line... and I can also not find where
TreeMap.firstKey
is called.

Kafka 2.3 is not released yet, thus "affected version" field seems not be 
correctly specified. What version are you using?

> NoSuchElementException when restoring state after a clean shutdown of a Kafka 
> Streams application
> -
>
> Key: KAFKA-8235
> URL: https://issues.apache.org/jira/browse/KAFKA-8235
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
> Environment: Linux, CentOS 7, Java 1.8.0_191, 50 partitions per 
> topic, replication factor 3
>Reporter: Andrew Klopper
>Priority: Major
>
> While performing a larger scale test of a new Kafka Streams application that 
> performs aggregation and suppression, we have discovered that we are unable 
> to restart the application after a clean shutdown. The error that is logged 
> is:
> {code:java}
> [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
> [rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] Encountered the 
> following error during processing:
> java.util.NoSuchElementException
> at java.util.TreeMap.key(TreeMap.java:1327)
> at java.util.TreeMap.firstKey(TreeMap.java:290)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288)
> at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
> at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317)
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
> {code}
> The issue doesn't seem to occur for small amounts of data, but it doesn't 
> take a particularly large amount of data to trigger the problem either.
> Any assistance would be greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver

2019-04-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8233:
---
Labels: needs-kip  (was: )

> Helper class to make it simpler to write test logic with TopologyTestDriver
> ---
>
> Key: KAFKA-8233
> URL: https://issues.apache.org/jira/browse/KAFKA-8233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jukka Karvanen
>Priority: Minor
>  Labels: needs-kip
>
> When using TopologyTestDriver you need to call ConsumerRecordFactory to 
> create ConsumerRecord passed into pipeInput method to write to topic. Also 
> when calling readOutput to consume from topic, you need to provide correct 
> Deserializers each time.
> You easily end up writing helper methods in your test classed, but this can 
> be avoided when adding generic input and output topic classes.
> This improvement adds TestInputTopic class which wraps TopologyTestDriver  
> and ConsumerRecordFactory methods as one class to be used to write to Input 
> Topics and TestOutputTopic class which collects TopologyTestDriver  reading 
> methods and provide typesafe read methods.
>  
> More info and an example of how Stream test looks after using this classes:
> [https://github.com/jukkakarvanen/kafka-streams-test-topics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818397#comment-16818397
 ] 

Matthias J. Sax commented on KAFKA-8233:


Thanks for creating the ticket. If you want to contribute something like this, 
it would be required to write a KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

> Helper class to make it simpler to write test logic with TopologyTestDriver
> ---
>
> Key: KAFKA-8233
> URL: https://issues.apache.org/jira/browse/KAFKA-8233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jukka Karvanen
>Priority: Minor
>
> When using TopologyTestDriver you need to call ConsumerRecordFactory to 
> create ConsumerRecord passed into pipeInput method to write to topic. Also 
> when calling readOutput to consume from topic, you need to provide correct 
> Deserializers each time.
> You easily end up writing helper methods in your test classed, but this can 
> be avoided when adding generic input and output topic classes.
> This improvement adds TestInputTopic class which wraps TopologyTestDriver  
> and ConsumerRecordFactory methods as one class to be used to write to Input 
> Topics and TestOutputTopic class which collects TopologyTestDriver  reading 
> methods and provide typesafe read methods.
>  
> More info and an example of how Stream test looks after using this classes:
> [https://github.com/jukkakarvanen/kafka-streams-test-topics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818388#comment-16818388
 ] 

Matthias J. Sax commented on KAFKA-7965:


one more: 
[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/435/tests]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: huxihx
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8237) Untangle TopicDeletionManager and add test cases

2019-04-15 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8237:
--

 Summary: Untangle TopicDeletionManager and add test cases
 Key: KAFKA-8237
 URL: https://issues.apache.org/jira/browse/KAFKA-8237
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


There are a few circular dependencies involving `TopicDeletionManager`. For 
example, both `PartitionStateMachine` and `ReplicaStateMachine` depend on 
`TopicDeletionManager` while it also depends on them. This makes testing 
difficult and so there are no unit tests. We should fix this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-04-15 Thread Bruno Cadonna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818336#comment-16818336
 ] 

Bruno Cadonna commented on KAFKA-7965:
--

Two more failure with Java 8 and Java 11 with the same error messages posted 
above.

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: huxihx
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2019-04-15 Thread Edmondo Porcu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edmondo Porcu updated KAFKA-6248:
-
Comment: was deleted

(was: Does it work for max.request.size?

 

 -Dtopic.max.request.size=300 -Dproducer.max.request.size=300

 

Exception in thread 
"analysis-input-enricher-e1c4502d-62e9-43a6-bf59-b37dcfc77ce2-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending 
since an error caught with a previous record (key NON PREVISTO value 
[B@62851189 timestamp 1555104485966) to topic 
analysis-input-enricher-cr-STATE-STORE-01-changelog due to 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1403267 
bytes when serialized which is larger than the maximum request size you have 
configured with the max.request.size configuration.)

> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2019-04-15 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818262#comment-16818262
 ] 

Edmondo Porcu commented on KAFKA-6248:
--

Does it work for max.request.size?

 

 -Dtopic.max.request.size=300 -Dproducer.max.request.size=300

 

Exception in thread 
"analysis-input-enricher-e1c4502d-62e9-43a6-bf59-b37dcfc77ce2-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending 
since an error caught with a previous record (key NON PREVISTO value 
[B@62851189 timestamp 1555104485966) to topic 
analysis-input-enricher-cr-STATE-STORE-01-changelog due to 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1403267 
bytes when serialized which is larger than the maximum request size you have 
configured with the max.request.size configuration.

> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8231) Expansion of ConnectClusterState interface

2019-04-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818254#comment-16818254
 ] 

ASF GitHub Bot commented on KAFKA-8231:
---

C0urante commented on pull request #6584: KAFKA-8231: Expansion of 
ConnectClusterState interface
URL: https://github.com/apache/kafka/pull/6584
 
 
   [Jira](https://issues.apache.org/jira/browse/KAFKA-8231) and 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface)
   
   The changes here add new methods to the `ConnectClusterState` interface so 
that Connect REST extensions can be more aware of the current state of the 
Connect cluster they are added to. The new methods allow extensions to query 
for connector and task configurations, as well as the ID of the Kafka cluster 
targeted by the Connect cluster.
   
   All new methods have new unit tests added for their implementations in the 
`ConnectClusterStateImpl` class.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expansion of ConnectClusterState interface
> --
>
> Key: KAFKA-8231
> URL: https://issues.apache.org/jira/browse/KAFKA-8231
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.0
>
>
> This covers [KIP-454: Expansion of the ConnectClusterState 
> interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8236) Incorporate version control for Kafka Streams Application Reset

2019-04-15 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8236:
--

 Summary: Incorporate version control for Kafka Streams Application 
Reset
 Key: KAFKA-8236
 URL: https://issues.apache.org/jira/browse/KAFKA-8236
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


Inspired by Spark mlflow which supports versioning log, we should be 
considering expose a special versioning tag for KStream applications to easy 
rollback bad code deploy. The naive approach is to store the versioning info in 
consumer offset topic so that when we perform rollback, we know where to read 
from the input, and where to cleanup the changelog topic. Essentially, this is 
an extension to our current application reset tool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2019-04-15 Thread evildracula (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818083#comment-16818083
 ] 

evildracula commented on KAFKA-2729:


Hello [~junrao], I'm now using 0.11.0.3 which is in affected versions. I would 
like to reproduce this issue in my DEV environment. Could you please help to 
provide reproduce steps? Many thanks.

I'm now reproducing by **systemctl start/stop iptables**  but failed.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.11.0.0
>Reporter: Danil Serdyuchenko
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver

2019-04-15 Thread Jukka Karvanen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jukka Karvanen updated KAFKA-8233:
--
Description: 
When using TopologyTestDriver you need to call ConsumerRecordFactory to create 
ConsumerRecord passed into pipeInput method to write to topic. Also when 
calling readOutput to consume from topic, you need to provide correct 
Deserializers each time.

You easily end up writing helper methods in your test classed, but this can be 
avoided when adding generic input and output topic classes.

This improvement adds TestInputTopic class which wraps TopologyTestDriver  and 
ConsumerRecordFactory methods as one class to be used to write to Input Topics 
and TestOutputTopic class which collects TopologyTestDriver  reading methods 
and provide typesafe read methods.

 

More info and an example of how Stream test looks after using this classes:

[https://github.com/jukkakarvanen/kafka-streams-test-topics]

  was:
When using TopologyTestDriver you need to call ConsumerRecordFactory to create 
ConsumerRecord passed into pipeInput method to write to topic. Also when 
calling readOutput to consume from topic, you need to provide correct 
Deserializers each time.

You easily end up writing helper methods in your test classed, but this can be 
avoided when adding generic input and output topic classes.

This improvement adds TestInputTopic class which wraps TopologyTestDriver  and 
ConsumerRecordFactory methods as one class to be used to write to Input Topics 
and TestOutputTopic class which collects TopologyTestDriver  reading methods 
and provide typesafe read methods.

 

 Example of how Stream test looks after using this classes:

[https://github.com/jukkakarvanen/kafka-streams-examples/blob/InputOutputTopic/src/test/java/io/confluent/examples/streams/WordCountLambdaExampleTest.java]


> Helper class to make it simpler to write test logic with TopologyTestDriver
> ---
>
> Key: KAFKA-8233
> URL: https://issues.apache.org/jira/browse/KAFKA-8233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jukka Karvanen
>Priority: Minor
>
> When using TopologyTestDriver you need to call ConsumerRecordFactory to 
> create ConsumerRecord passed into pipeInput method to write to topic. Also 
> when calling readOutput to consume from topic, you need to provide correct 
> Deserializers each time.
> You easily end up writing helper methods in your test classed, but this can 
> be avoided when adding generic input and output topic classes.
> This improvement adds TestInputTopic class which wraps TopologyTestDriver  
> and ConsumerRecordFactory methods as one class to be used to write to Input 
> Topics and TestOutputTopic class which collects TopologyTestDriver  reading 
> methods and provide typesafe read methods.
>  
> More info and an example of how Stream test looks after using this classes:
> [https://github.com/jukkakarvanen/kafka-streams-test-topics]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8100) kafka consumer not refresh metadata for dynamic topic deletion

2019-04-15 Thread Shengnan YU (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengnan YU updated KAFKA-8100:
---
Summary: kafka consumer not refresh metadata for dynamic topic deletion  
(was: If delete expired topic, kafka consumer will keep flushing unknown_topic 
warning in log)

> kafka consumer not refresh metadata for dynamic topic deletion
> --
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-04-15 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817951#comment-16817951
 ] 

Shengnan YU commented on KAFKA-8100:


Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if 
the topic actually exists.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-04-15 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817951#comment-16817951
 ] 

Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM:
-

Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered 
again if the topic actually exists.


was (Author: ysn2233):
Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if 
the topic actually exists.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-04-15 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817951#comment-16817951
 ] 

Shengnan YU edited comment on KAFKA-8100 at 4/15/19 1:07 PM:
-

Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily discovered if 
the topic actually exists.


was (Author: ysn2233):
Hi could you please explain why not delete this topic info in metadata first 
when hitting UNKNOWN_TOPIC_OR_PARTITION because it can be easily recoverable if 
the topic actually exists.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8234) Multi-module support for JAAS config property

2019-04-15 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817935#comment-16817935
 ] 

Gabor Somogyi commented on KAFKA-8234:
--

cc [~viktorsomogyi]

> Multi-module support for JAAS config property
> -
>
> Key: KAFKA-8234
> URL: https://issues.apache.org/jira/browse/KAFKA-8234
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Gabor Somogyi
>Priority: Major
>
> I've tried to add multi-modules to JAAS config property but its not supported 
> at the moment:
> {code:java}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed 
> create new KafkaAdminClient
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370)
>   at 
> org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
>   at 
> com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96)
>   at 
> com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala)
> Caused by: java.lang.IllegalArgumentException: JAAS config property contains 
> 2 login modules, should be 1 module
>   at 
> org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95)
>   at 
> org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346)
>   ... 3 more
> {code}
> I wanted to implement a fallback scenario with sufficient LoginModule flag 
> but the missing multi-module support makes in impossible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8234) Multi-module support for JAAS config property

2019-04-15 Thread Gabor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated KAFKA-8234:
-
Affects Version/s: 2.2.0

> Multi-module support for JAAS config property
> -
>
> Key: KAFKA-8234
> URL: https://issues.apache.org/jira/browse/KAFKA-8234
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.2.0
>Reporter: Gabor Somogyi
>Priority: Major
>
> I've tried to add multi-modules to JAAS config property but its not supported 
> at the moment:
> {code:java}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed 
> create new KafkaAdminClient
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370)
>   at 
> org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
>   at 
> com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96)
>   at 
> com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala)
> Caused by: java.lang.IllegalArgumentException: JAAS config property contains 
> 2 login modules, should be 1 module
>   at 
> org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95)
>   at 
> org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346)
>   ... 3 more
> {code}
> I wanted to implement a fallback scenario with sufficient LoginModule flag 
> but the missing multi-module support makes in impossible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8235) NoSuchElementException when restoring state after a clean shutdown of a Kafka Streams application

2019-04-15 Thread Andrew Klopper (JIRA)
Andrew Klopper created KAFKA-8235:
-

 Summary: NoSuchElementException when restoring state after a clean 
shutdown of a Kafka Streams application
 Key: KAFKA-8235
 URL: https://issues.apache.org/jira/browse/KAFKA-8235
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.0
 Environment: Linux, CentOS 7, Java 1.8.0_191, 50 partitions per topic, 
replication factor 3
Reporter: Andrew Klopper


While performing a larger scale test of a new Kafka Streams application that 
performs aggregation and suppression, we have discovered that we are unable to 
restart the application after a clean shutdown. The error that is logged is:
{code:java}
[rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[rollupd-5ee1997b-8302-4e88-b3db-a74c8b805a6c-StreamThread-1] Encountered the 
following error during processing:
java.util.NoSuchElementException
at java.util.TreeMap.key(TreeMap.java:1327)
at java.util.TreeMap.firstKey(TreeMap.java:290)
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:288)
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:317)
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:92)
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)
{code}
The issue doesn't seem to occur for small amounts of data, but it doesn't take 
a particularly large amount of data to trigger the problem either.

Any assistance would be greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8234) Multi-module support for JAAS config property

2019-04-15 Thread Gabor Somogyi (JIRA)
Gabor Somogyi created KAFKA-8234:


 Summary: Multi-module support for JAAS config property
 Key: KAFKA-8234
 URL: https://issues.apache.org/jira/browse/KAFKA-8234
 Project: Kafka
  Issue Type: Improvement
Reporter: Gabor Somogyi


I've tried to add multi-modules to JAAS config property but its not supported 
at the moment:
{code:java}
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed 
create new KafkaAdminClient
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370)
at 
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
at 
com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96)
at 
com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala)
Caused by: java.lang.IllegalArgumentException: JAAS config property contains 2 
login modules, should be 1 module
at 
org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95)
at 
org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
at 
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346)
... 3 more
{code}
I wanted to implement a fallback scenario with sufficient LoginModule flag but 
the missing multi-module support makes in impossible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8232) Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion

2019-04-15 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8232.
---
   Resolution: Fixed
 Reviewer: Manikumar
Fix Version/s: 2.2.1

> Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion
> 
>
> Key: KAFKA-8232
> URL: https://issues.apache.org/jira/browse/KAFKA-8232
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.3.0, 2.2.1
>
>
> {code:java}
> java.lang.AssertionError: Delete path for topic should exist after deletion.
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion(TopicCommandWithAdminClientTest.scala:471){code}
> The verification doesn't look safe since the delete path could have been 
> removed before the test verifies it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8233) Helper class to make it simpler to write test logic with TopologyTestDriver

2019-04-15 Thread Jukka Karvanen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jukka Karvanen updated KAFKA-8233:
--
Summary: Helper class to make it simpler to write test logic with 
TopologyTestDriver  (was: Helper class to make it simpler to write test logic 
TopologyTestDriver)

> Helper class to make it simpler to write test logic with TopologyTestDriver
> ---
>
> Key: KAFKA-8233
> URL: https://issues.apache.org/jira/browse/KAFKA-8233
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jukka Karvanen
>Priority: Minor
>
> When using TopologyTestDriver you need to call ConsumerRecordFactory to 
> create ConsumerRecord passed into pipeInput method to write to topic. Also 
> when calling readOutput to consume from topic, you need to provide correct 
> Deserializers each time.
> You easily end up writing helper methods in your test classed, but this can 
> be avoided when adding generic input and output topic classes.
> This improvement adds TestInputTopic class which wraps TopologyTestDriver  
> and ConsumerRecordFactory methods as one class to be used to write to Input 
> Topics and TestOutputTopic class which collects TopologyTestDriver  reading 
> methods and provide typesafe read methods.
>  
>  Example of how Stream test looks after using this classes:
> [https://github.com/jukkakarvanen/kafka-streams-examples/blob/InputOutputTopic/src/test/java/io/confluent/examples/streams/WordCountLambdaExampleTest.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8233) Helper class to make it simpler to write test logic TopologyTestDriver

2019-04-15 Thread Jukka Karvanen (JIRA)
Jukka Karvanen created KAFKA-8233:
-

 Summary: Helper class to make it simpler to write test logic 
TopologyTestDriver
 Key: KAFKA-8233
 URL: https://issues.apache.org/jira/browse/KAFKA-8233
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Jukka Karvanen


When using TopologyTestDriver you need to call ConsumerRecordFactory to create 
ConsumerRecord passed into pipeInput method to write to topic. Also when 
calling readOutput to consume from topic, you need to provide correct 
Deserializers each time.

You easily end up writing helper methods in your test classed, but this can be 
avoided when adding generic input and output topic classes.

This improvement adds TestInputTopic class which wraps TopologyTestDriver  and 
ConsumerRecordFactory methods as one class to be used to write to Input Topics 
and TestOutputTopic class which collects TopologyTestDriver  reading methods 
and provide typesafe read methods.

 

 Example of how Stream test looks after using this classes:

[https://github.com/jukkakarvanen/kafka-streams-examples/blob/InputOutputTopic/src/test/java/io/confluent/examples/streams/WordCountLambdaExampleTest.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8232) Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion

2019-04-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817885#comment-16817885
 ] 

ASF GitHub Bot commented on KAFKA-8232:
---

rajinisivaram commented on pull request #6581: KAFKA-8232; Test topic delete 
completion rather than intermediate state
URL: https://github.com/apache/kafka/pull/6581
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky test kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion
> 
>
> Key: KAFKA-8232
> URL: https://issues.apache.org/jira/browse/KAFKA-8232
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.3.0
>
>
> {code:java}
> java.lang.AssertionError: Delete path for topic should exist after deletion.
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> kafka.admin.TopicCommandWithAdminClientTest.testTopicDeletion(TopicCommandWithAdminClientTest.scala:471){code}
> The verification doesn't look safe since the delete path could have been 
> removed before the test verifies it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8207) StickyPartitionAssignor for KStream

2019-04-15 Thread neeraj (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817625#comment-16817625
 ] 

neeraj edited comment on KAFKA-8207 at 4/15/19 9:00 AM:


[~mjsax] Thanks for explaining, because of new partitions assignment to node 1 
when it comes up, in production with heave state store it will take time for 
state store to be updated for new partitions.




was (Author: neeraj.bhatt):
[~mjsax] Thanks for explaining. Any idea when 2.3 is expected? also kindly 
assign me a bug related to this KIP if it helps in early release.

> StickyPartitionAssignor for KStream
> ---
>
> Key: KAFKA-8207
> URL: https://issues.apache.org/jira/browse/KAFKA-8207
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: neeraj
>Priority: Major
>
> In KStreams I am not able to give a sticky partition assignor or my custom 
> partition assignor.
> Overriding the property while building stream does not work
> streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CustomAssignor.class.getName());
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8207) StickyPartitionAssignor for KStream

2019-04-15 Thread neeraj (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817625#comment-16817625
 ] 

neeraj commented on KAFKA-8207:
---

[~mjsax] Thanks for explaining. Any idea when 2.3 is expected? also kindly 
assign me a bug related to this KIP if it helps in early release.

> StickyPartitionAssignor for KStream
> ---
>
> Key: KAFKA-8207
> URL: https://issues.apache.org/jira/browse/KAFKA-8207
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: neeraj
>Priority: Major
>
> In KStreams I am not able to give a sticky partition assignor or my custom 
> partition assignor.
> Overriding the property while building stream does not work
> streams props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> CustomAssignor.class.getName());
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8226) New MirrorMaker option partition.to.partition

2019-04-15 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817610#comment-16817610
 ] 

Sönke Liebau edited comment on KAFKA-8226 at 4/15/19 7:14 AM:
--

Hi [~ernisv],

the functionality that you are implementing is definitely useful, however it 
can also easily be achieved by implementing a custom MessageHandler that 
preserves partitioning. 

Personally I think we should try to avoid adding too many features for which 
extensions point were already designed in Kafka.

Additionally there is [MirrorMaker 
2.0|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]]
 coming up, which I believe has this functionality built in.


was (Author: sliebau):
Hi [~ernisv],

the functionality that you are implementing is definitely useful, however it 
can also easily be achieved by implementing a custom MessageHandler that 
preserves partitioning. 

Personally I think we should try to avoid adding too many features for which 
extensions point were already designed in Kafka.

Additionally there is [MirrorMaker 
2.0|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]]
 coming up, which I believe has this functionality built in.

> New MirrorMaker option partition.to.partition
> -
>
> Key: KAFKA-8226
> URL: https://issues.apache.org/jira/browse/KAFKA-8226
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Ernestas Vaiciukevičius
>Priority: Major
>
> Currently when MirrorMaker moves data between topics with records with null 
> keys - it shuffles records between destination topic's partitions. Sometimes 
> it's desirable to try preserving the original partition.
> Related PR adds new command line option to do that:
> When partition.to.partition=true MirrorMaker retains the partition number 
> when mirroring records even without the keys. 
>  When using this option - source and destination topics are assumed to have 
> the same number of partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8226) New MirrorMaker option partition.to.partition

2019-04-15 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817610#comment-16817610
 ] 

Sönke Liebau edited comment on KAFKA-8226 at 4/15/19 7:14 AM:
--

Hi [~ernisv],

the functionality that you are implementing is definitely useful, however it 
can also easily be achieved by implementing a custom MessageHandler that 
preserves partitioning. 

Personally I think we should try to avoid adding too many features for which 
extensions point were already designed in Kafka.

Additionally there is [MirrorMaker 
2.0|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]
 coming up, which I believe has this functionality built in.


was (Author: sliebau):
Hi [~ernisv],

the functionality that you are implementing is definitely useful, however it 
can also easily be achieved by implementing a custom MessageHandler that 
preserves partitioning. 

Personally I think we should try to avoid adding too many features for which 
extensions point were already designed in Kafka.

Additionally there is [MirrorMaker 
2.0|https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]]
 coming up, which I believe has this functionality built in.

> New MirrorMaker option partition.to.partition
> -
>
> Key: KAFKA-8226
> URL: https://issues.apache.org/jira/browse/KAFKA-8226
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Ernestas Vaiciukevičius
>Priority: Major
>
> Currently when MirrorMaker moves data between topics with records with null 
> keys - it shuffles records between destination topic's partitions. Sometimes 
> it's desirable to try preserving the original partition.
> Related PR adds new command line option to do that:
> When partition.to.partition=true MirrorMaker retains the partition number 
> when mirroring records even without the keys. 
>  When using this option - source and destination topics are assumed to have 
> the same number of partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8226) New MirrorMaker option partition.to.partition

2019-04-15 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817610#comment-16817610
 ] 

Sönke Liebau commented on KAFKA-8226:
-

Hi [~ernisv],

the functionality that you are implementing is definitely useful, however it 
can also easily be achieved by implementing a custom MessageHandler that 
preserves partitioning. 

Personally I think we should try to avoid adding too many features for which 
extensions point were already designed in Kafka.

Additionally there is [MirrorMaker 
2.0|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RemoteTopics,Partitions]]
 coming up, which I believe has this functionality built in.

> New MirrorMaker option partition.to.partition
> -
>
> Key: KAFKA-8226
> URL: https://issues.apache.org/jira/browse/KAFKA-8226
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Ernestas Vaiciukevičius
>Priority: Major
>
> Currently when MirrorMaker moves data between topics with records with null 
> keys - it shuffles records between destination topic's partitions. Sometimes 
> it's desirable to try preserving the original partition.
> Related PR adds new command line option to do that:
> When partition.to.partition=true MirrorMaker retains the partition number 
> when mirroring records even without the keys. 
>  When using this option - source and destination topics are assumed to have 
> the same number of partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)