[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest

2024-02-14 Thread Chaitanya Mukka (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817597#comment-17817597
 ] 

Chaitanya Mukka commented on KAFKA-16223:
-

Hi [~hgeraldino]! I hope you are doing well. Are you actively working on this 
task? I have a local branch with changes that I was working on the same. I 
wanted to check if you have already closed on the changes for this or if you 
are open to me picking it up. Thanks!

> Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
> ---
>
> Key: KAFKA-16223
> URL: https://issues.apache.org/jira/browse/KAFKA-16223
> Project: Kafka
>  Issue Type: Sub-task
>  Components: connect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix KafkaAdminClientTest.testClientInstanceId [kafka]

2024-02-14 Thread via GitHub


dajac merged PR #15370:
URL: https://github.com/apache/kafka/pull/15370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16259) Immutable MetadataCache to improve client performance

2024-02-14 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-16259:
--

Assignee: Zhifeng Chen

> Immutable MetadataCache to improve client performance
> -
>
> Key: KAFKA-16259
> URL: https://issues.apache.org/jira/browse/KAFKA-16259
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Zhifeng Chen
>Assignee: Zhifeng Chen
>Priority: Major
> Attachments: image-2024-02-14-12-11-07-366.png
>
>
> TL;DR, A Kafka client produce latency issue is identified caused by 
> synchronized lock contention of metadata cache read/write in the native kafka 
> producer.
> Trigger Condition: A producer need to produce to large number of topics. such 
> as in kafka rest-proxy
>  
>  
> What is producer metadata cache
> Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
> fetch metadata every time when produce message to reduce latency
>  
> What’s the synchronized lock contention problem
> Kafka producer metadata cache is a *mutable* object, read/write are isolated 
> by a synchronized lock. Which means when the metadata cache is being updated, 
> all read requests are blocked. 
> Topic metadata expiration frequency increase liner with number of topics. In 
> a kafka cluster with large number of topic partitions, topic metadata 
> expiration and refresh triggers high frequent metadata update. When read 
> operation blocked by update, producer threads are blocked and caused high 
> produce latency issue.
>  
> *Proposed solution*
> TL;DR Optimize performance of metadata cache read operation of native kafka 
> producer with copy-on-write strategy
> What is copy-on-write strategy
> It’s a solution to reduce synchronized lock contention by making the object 
> immutable, and always create a new instance when updating, but since the 
> object is immutable, read operation will be free from waiting, thus produce 
> latency reduced significantly
> Besides performance, it can also make the metadata cache immutable from 
> unexpected modification, reduce occurrence of code bugs due to incorrect 
> synchronization 
>  
> {*}Test result{*}:
> Environment: Kafka-rest-proxy
> Client version: 2.8.0
> Number of topic partitions: 250k
> test result show 90%+ latency reduction on test cluster
> !image-2024-02-14-12-11-07-366.png!
> P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper 
> part show latency after the improvement, lower part show before improvement)
> *Dump show details of the problem*
> Threads acquiring lock
>  Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 
> 0x7f77d70121a0 ]
>  Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 
> 0x7f77d70121a0 ]
>  ...
> at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
> at 
> io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
> at 
> io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)
> Threads hold the lock
>  kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 
> 0x7f77d70121a0 ]
> at 
> java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
> at 
> java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
> at 
> 

Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]

2024-02-14 Thread via GitHub


showuon commented on PR #14891:
URL: https://github.com/apache/kafka/pull/14891#issuecomment-1945384353

   @jolshan , do you want to have another look? If no, I'm going to merge this 
PR this week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]

2024-02-14 Thread via GitHub


github-actions[bot] commented on PR #14705:
URL: https://github.com/apache/kafka/pull/14705#issuecomment-1945310660

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]

2024-02-14 Thread via GitHub


mdedetrich commented on PR #15341:
URL: https://github.com/apache/kafka/pull/15341#issuecomment-194527

   > Thanks for taking the comments into consideration. One other suggestion is 
that we can have a look into #13375 to see what is the common between both 
`EmbeddedKafkaCluster` in stream and connect and see if we can have one common 
`EmbeddedKafkaCluster` between both of them. And later move connect to use it. 
Which also can be follow up JIRA if we want to keep this one simple. Either way 
I'm happy to lend a hand in the connect part if needed!
   
   Cool I will look into it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]

2024-02-14 Thread via GitHub


jeffkbkim commented on code in PR #15361:
URL: https://github.com/apache/kafka/pull/15361#discussion_r1490286153


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java:
##
@@ -161,6 +140,10 @@ public void update(final ConsumerRecord 
record) {
 updatedPartitions.put(tp, updatedPartitions.get(tp) + 1);
 }
 
+@Override
+public void maybeCheckpoint() {
+flushState();
+}
 }
 
 }

Review Comment:
   nit: can we add a newline?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##
@@ -150,5 +150,11 @@ private void initTopology() {
 }
 }
 
+@Override
+public void maybeCheckpoint() {
+if (StateManagerUtil.checkpointNeeded(false, 
stateMgr.changelogOffsets(), offsets)) {

Review Comment:
   
[AbstractTask#maybeCheckpoint()](https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97)
 passes in `stateMgr.changelogOffsets` as the new offset snapshot in 
`StateManagerUtil#checkpointNeeded`. Why is the behavior reversed here?
   
   is it because `offsets` is the object that is updated on every `update()`? 
what about `ProcessorStateManager#changelogOffsets`, would that not have the 
latest?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##
@@ -259,19 +251,14 @@ void initialize() {
 for (final Map.Entry entry : 
partitionOffsets.entrySet()) {
 globalConsumer.seek(entry.getKey(), entry.getValue());
 }
-lastFlush = time.milliseconds();
 }
 
 void pollAndUpdate() {
 final ConsumerRecords received = 
globalConsumer.poll(pollTime);
 for (final ConsumerRecord record : received) {
 stateMaintainer.update(record);
 }
-final long now = time.milliseconds();
-if (now - flushInterval >= lastFlush) {
-stateMaintainer.flushState();
-lastFlush = now;
-}
+stateMaintainer.maybeCheckpoint();

Review Comment:
   should we still keep an interval-based flush or is the delta check 
sufficient?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16260) Remove window.size.ms from StreamsConfig

2024-02-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Component/s: streams

> Remove window.size.ms from StreamsConfig
> 
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Priority: Major
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Remove window.size.ms from StreamsConfig

2024-02-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Labels: needs-kip  (was: )

> Remove window.size.ms from StreamsConfig
> 
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Priority: Major
>  Labels: needs-kip
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16260) Remove window.size.ms from StreamsConfig

2024-02-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16260:
---

Assignee: Lucia Cerchie

> Remove window.size.ms from StreamsConfig
> 
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: needs-kip
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]

2024-02-14 Thread via GitHub


jolshan commented on PR #15359:
URL: https://github.com/apache/kafka/pull/15359#issuecomment-1945185518

   I will wait a day or to see if @cmccabe has any comments. If not I will 
merge.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


ijuma commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1945066461

   @hachikuji @msn-tldr #15376 looks related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16259 Immutable MetadataCache to improve client performance [kafka]

2024-02-14 Thread via GitHub


ijuma commented on PR #15376:
URL: https://github.com/apache/kafka/pull/15376#issuecomment-1945065055

   Thanks for the PR. Is this similar to what was done in #15323?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]

2024-02-14 Thread via GitHub


mjsax commented on PR #15219:
URL: https://github.com/apache/kafka/pull/15219#issuecomment-1945045218

   > Do we need to lock the implementations of 
AbstractDualSchemaRocksDBSegmentedBytesStore.getWriteBatches ?
   
   Outstanding catch! I believe yes. Updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16259) Immutable MetadataCache to improve client performance

2024-02-14 Thread Zhifeng Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhifeng Chen updated KAFKA-16259:
-
Description: 
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

{*}Test result{*}:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)

*Dump show details of the problem*

Threads acquiring lock
 Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x7f77d70121a0 ]
 ...
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
at 
org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
at 
io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
at 
io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)

Threads hold the lock
 kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 
0x7f77d70121a0 ]
at 
java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
at 
java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
at 
org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
at 
org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
at 
org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
at 
org.apache.kafka.clients.MetadataCache$$Lambda$695/0x7f75da3ddcb0.apply(Unknown
 Source)
at 
java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at 
org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)

  was:
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 

[jira] [Updated] (KAFKA-16259) Immutable MetadataCache to improve client performance

2024-02-14 Thread Zhifeng Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhifeng Chen updated KAFKA-16259:
-
Description: 
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

{*}Test result{*}:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)

Thread dump show details of the synchronization contention

Threads acquiring lock
 Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x7f77d70121a0 ]
 Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x7f77d70121a0 ]
 ...
at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111)
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019)
at 
org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144)
at 
io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39)
at 
io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117)

Threads hodl the lock
 kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 
0x7f77d70121a0 ]
at 
java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655)
at 
java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484)
at 
org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162)
at 
org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152)
at 
org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177)
at 
org.apache.kafka.clients.MetadataCache$$Lambda$695/0x7f75da3ddcb0.apply(Unknown
 Source)
at 
java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195)
at 
org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178)
at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)

  was:
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in 

[PR] KAFKA-16259 Immutable MetadataCache to improve client performance [kafka]

2024-02-14 Thread via GitHub


ericzhifengchen opened a new pull request, #15376:
URL: https://github.com/apache/kafka/pull/15376

   *More detailed description of your change,
   current MetadataCache is partially (but not fully) immutable, thus 
read/write requires synchronization and led to high produce latency with large 
number of topics in a kafka cluster
   This change improves performance of metadata cache read operation of native 
kafka producer with copy-on-write strategy
   
   What is copy-on-write strategy
   It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly
   
   Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 
   
   *Summary of testing strategy (including rationale)
   Add unit test to cover concurrent read/write
   Run in production, with large scale for over 3months 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-14 Thread via GitHub


AndrewJSchofield commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() {
 return currentAssignment.equals(currentTargetAssignment);
 }
 
+/**
+ * @return True if the member should not send heartbeats, which would be 
one of the following
+ * cases:
+ * 
+ * Member is not subscribed to any topics
+ * Member has received a fatal error in a previous heartbeat 
response
+ * Member is stale, meaning that it has left the group due to expired 
poll timer
+ * 
+ */
 @Override
 public boolean shouldSkipHeartbeat() {
 MemberState state = state();
-return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL 
|| state == MemberState.STALE;
+}
+
+/**
+ * @return True if the member is preparing to leave the group (waiting for 
callbacks), or
+ * leaving (sending last heartbeat). This is used to skip proactively 
leaving the group when
+ * the consumer poll timer expires.
+ */
+public boolean isLeavingGroup() {
+MemberState state = state();
+return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
 }
 
 /**

Review Comment:
   I think that technically, it doesn't take the *user* to poll in order to 
rejoin. The code polls more frequently is the user's poll timeout is long 
enough.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent() ||
-membershipManager.shouldSkipHeartbeat() ||
-pollTimer.isExpired()) {
+membershipManager.shouldSkipHeartbeat()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
-logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-"the poll loop is spending too much time processing messages. 
You can address this " +
-"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-"returned in poll() with max.poll.records.");
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   I think the logic as written is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16260) Remove window.size.ms from StreamsConfig

2024-02-14 Thread Lucia Cerchie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucia Cerchie updated KAFKA-16260:
--
Description: {{window.size.ms}}  is not a true KafkaStreams config, and 
results in an error when set from a KStreams application. It belongs on the 
client.

> Remove window.size.ms from StreamsConfig
> 
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucia Cerchie
>Priority: Major
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16260) Remove window.size.ms from StreamsConfig

2024-02-14 Thread Lucia Cerchie (Jira)
Lucia Cerchie created KAFKA-16260:
-

 Summary: Remove window.size.ms from StreamsConfig
 Key: KAFKA-16260
 URL: https://issues.apache.org/jira/browse/KAFKA-16260
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucia Cerchie






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16260) Remove window.size.ms from StreamsConfig

2024-02-14 Thread Lucia Cerchie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817530#comment-17817530
 ] 

Lucia Cerchie commented on KAFKA-16260:
---

see discussion https://github.com/apache/kafka/pull/14360

> Remove window.size.ms from StreamsConfig
> 
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucia Cerchie
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15333) Flaky build failure throwing Connect Exception: Could not connect to server....

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817529#comment-17817529
 ] 

Greg Harris commented on KAFKA-15333:
-

Caused by this Gradle bug: https://github.com/gradle/gradle/issues/27801

> Flaky build failure throwing Connect Exception: Could not connect to 
> server
> ---
>
> Key: KAFKA-15333
> URL: https://issues.apache.org/jira/browse/KAFKA-15333
> Project: Kafka
>  Issue Type: Test
>  Components: connect, unit tests
>Reporter: Philip Nee
>Priority: Major
>
> We frequently observe flaky build failure with the following message.  The is 
> from the most recent PR post 3.5.0:
>  
> {code:java}
> > Task :generator:testClasses UP-TO-DATE
> Unexpected exception thrown.
> org.gradle.internal.remote.internal.MessageIOException: Could not read 
> message from '/127.0.0.1:38354'.
>   at 
> org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
>   at 
> org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException
>   at 
> org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72)
>   at 
> org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52)
>   at 
> org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81)
>   ... 6 more
> > Task :streams:upgrade-system-tests-26:unitTest
> org.gradle.internal.remote.internal.ConnectException: Could not connect to 
> server [3156f144-9a89-4c47-91ad-88a8378ec726 port:37889, 
> addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
>   at 
> org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36)
>   at 
> org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103)
>   at 
> org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
>   at 
> worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
>   at 
> worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
> Caused by: java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
>   at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:122)
>   at 
> org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81)
>   at 
> org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54)
>   ... 5 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15407) Not able to connect to kafka from the Private NLB from outside the VPC account

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15407.
-
Resolution: Invalid

> Not able to connect to kafka from the Private NLB from outside the VPC 
> account 
> ---
>
> Key: KAFKA-15407
> URL: https://issues.apache.org/jira/browse/KAFKA-15407
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, connect, consumer, producer , protocol
> Environment: Staging, PROD
>Reporter: Shivakumar
>Priority: Blocker
> Attachments: image-2023-08-28-12-37-33-100.png
>
>
> !image-2023-08-28-12-37-33-100.png|width=768,height=223!
> Problem statement : 
> We are trying to connect Kafka from another account/VPC account
> Our kafka is in EKS cluster , we have service pointing to these pods for 
> connection
> We tried to create private link endpoint form Account B to connect to our NLB 
> to connect to our Kafka in Account A
> We see the connection reset from both client and target(kafka) in the NLB 
> monitoring tab of AWS.
> We tried various combo of listeners and advertised listeners which did not 
> help us.
> We are assuming we are missing some combination of Listeners and Network 
> level configs with which this connection can be made 
> Can you please guide us with this as we are blocked with a major migration. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15407) Not able to connect to kafka from the Private NLB from outside the VPC account

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817524#comment-17817524
 ] 

Greg Harris commented on KAFKA-15407:
-

Hi [~shivakumar] I don't think this is an appropriate place to request this 
form of support.
You can reference the public documentation: 
[https://kafka.apache.org/documentation/#brokerconfigs] and the networking 
documentation for your cloud provider.

> Not able to connect to kafka from the Private NLB from outside the VPC 
> account 
> ---
>
> Key: KAFKA-15407
> URL: https://issues.apache.org/jira/browse/KAFKA-15407
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, connect, consumer, producer , protocol
> Environment: Staging, PROD
>Reporter: Shivakumar
>Priority: Blocker
> Attachments: image-2023-08-28-12-37-33-100.png
>
>
> !image-2023-08-28-12-37-33-100.png|width=768,height=223!
> Problem statement : 
> We are trying to connect Kafka from another account/VPC account
> Our kafka is in EKS cluster , we have service pointing to these pods for 
> connection
> We tried to create private link endpoint form Account B to connect to our NLB 
> to connect to our Kafka in Account A
> We see the connection reset from both client and target(kafka) in the NLB 
> monitoring tab of AWS.
> We tried various combo of listeners and advertised listeners which did not 
> help us.
> We are assuming we are missing some combination of Listeners and Network 
> level configs with which this connection can be made 
> Can you please guide us with this as we are blocked with a major migration. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-14 Thread via GitHub


kirktrue commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
 if (!coordinatorRequestManager.coordinator().isPresent() ||
-membershipManager.shouldSkipHeartbeat() ||
-pollTimer.isExpired()) {
+membershipManager.shouldSkipHeartbeat()) {
 membershipManager.onHeartbeatRequestSkipped();
 return NetworkClientDelegate.PollResult.EMPTY;
 }
 pollTimer.update(currentTimeMs);
-if (pollTimer.isExpired()) {
-logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-"was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-"the poll loop is spending too much time processing messages. 
You can address this " +
-"either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-"returned in poll() with max.poll.records.");
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if` 
_inside_ the `if (pollTimer.isExpired()`? That is, do we really want to 
continue down to line 212 if the timer is expired but it's not already leaving 
the group?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -335,8 +335,13 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+/**
+ * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and 
rejoin on the next
+ * call to consumer.poll().
+ */
 @Override
-public boolean isStaled() {
+public boolean isStale() {

Review Comment:
   Thank you!  



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -335,8 +335,13 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+/**
+ * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and 
rejoin on the next
+ * call to consumer.poll().
+ */
 @Override
-public boolean isStaled() {
+public boolean isStale() {

Review Comment:
   Any reason we don't just expose the inner state via a `state()` method so 
that we don't have to write `isStateA`, `isStateB`, `isStateC`,  etc.?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() {
 @Override
 public void onHeartbeatRequestSent() {
 MemberState state = state();
-if (isStaled()) {
-log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-// TODO: Integrate partition revocation/loss callback
-transitionToJoining();
-return;
-}
-

Review Comment:
   For the uninitiated (like myself), would you consider adding a really brief 
comment that provides pointers to where some of the other states (e.g. `STALE`) 
are handled?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager {
 MemberState state();
 
 /**
- * @return True if the member is staled due to expired poll timer.
+ * @return True if the poll timer expired, indicating that there hasn't 
been a call to
+ * consumer poll within the max poll interval. In this case, the member 
will proactively
+ * leave the group, and rejoin on the next call to poll.
  */
-boolean isStaled();
+boolean isStale();

Review Comment:
   Thank you! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16165:
---
Description: 
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 
org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)


We should review the poll expiration logic that triggers a leave group 
operation. That is currently applied in the HB Manager poll, without any 
validation, and given it depends on the consumer poll timer, it could happen at 
any time, regardless of the state of the member. Ex. poll timer could expire 
when the member is leaving, leading to this leaving->stale invalid transition. 
We should probably consider that this pro-active leave should only apply when 
the consumer is not leaving (prepare leaving or leaving)

  was:
Running system tests with the new async consumer revealed an invalid transition 
related to the consumer not being polled on the interval in some kind of 
scenario (maybe relates to consumer close, as the transition is leaving->stale)

Log trace:

[2024-01-17 19:45:07,379] WARN [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] consumer poll timeout has expired. This means 
the time between subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is spending 
too much time processing messages. You can address this either by increasing 
max.poll.interval.ms or by reducing the maximum size of batches returned in 
poll() with max.poll.records. 
(org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
[2024-01-17 19:45:07,379] ERROR [Consumer 
clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, 
groupId=consumer-groups-test-2] Unexpected error caught in consumer network 
thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91)
java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303)
at 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739)
at 

[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817522#comment-17817522
 ] 

Greg Harris commented on KAFKA-15841:
-

Hi [~henriquemota]!

Can you share more details about your setup? Do you have a single JDBC 
connector, or multiple in the same cluster? Are connectors configured to read 
from one topic, or multiple? What is your tasks.max configuration, and what 
consumer assignor are you using?

Consumers support distributing reading from multiple topics/topic-partitions 
across consumers in a consumer group, and Connect supports distributing 
multiple tasks across multiple workers, both of which allow you to map multiple 
topic-partitions of work onto multiple workers.

For example, say you have N topic-paritions, and M connect workers, with N > M. 
Any one of the following could be used to distribute the work:
1. You could run one-connector-per-topic, and configure each connector with 
tasks.max=1, and have the worker distribute the N tasks across M workers.
2. You could add all N topics to a single connector with tasks.max=M, and have 
the consumer group distribute the N topic-partitions among those M tasks.
3. You could manually group the N topics into M groups, and create M connectors 
with tasks.max=1, giving each connector one group of topics.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]

2024-02-14 Thread via GitHub


lianetm opened a new pull request, #15375:
URL: https://github.com/apache/kafka/pull/15375

   This fixes an invalid transition (leaving->stale) that was discovered in the 
system tests. The underlying issue was that the poll timer expiration logic was 
blindly forcing a transition to stale and sending a leave group, without 
considering that the member could be already leaving. 
   The fix included in this PR ensures that the poll timer expiration logic, 
whose purpose is to leave the group, is only applied if the member is not 
already leaving. Note that it also fixes the transition out of the STALE state, 
that should only happen when the poll timer is reset. 
   
   As a result of this changes:
   - If the poll timer expires while the member is not leaving, the poll timer 
expiration logic is applied: it will transition to stale, send a leave group, 
and remain in STALE state until the timer is reset. At that point the member 
will transition to JOINING to rejoin the group. 
   - If the poll timer expires while the member is already leaving, the poll 
timer expiration logic does not apply, and just lets the HB continue. Not that 
this would be the case of member in PREPARE_LEAVING waiting for callbacks to 
complete (needs to continue sending HB), or LEAVING (needs to send the last HB 
to leave).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16259) Immutable MetadataCache to improve client performance

2024-02-14 Thread Zhifeng Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhifeng Chen updated KAFKA-16259:
-
Description: 
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

{*}Test result{*}:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)

  was:
TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

Test result:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)


> Immutable MetadataCache to improve client performance
> -
>
> Key: KAFKA-16259
> URL: https://issues.apache.org/jira/browse/KAFKA-16259
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Zhifeng Chen
>Priority: Major
> Attachments: image-2024-02-14-12-11-07-366.png
>
>
> TL;DR, A Kafka client produce latency issue is identified caused by 
> synchronized lock contention of metadata cache read/write in the native kafka 
> producer.
> Trigger Condition: A producer need to produce to large number of topics. such 
> as in kafka rest-proxy
>  
>  
> What is producer metadata cache
> Kafka producer maintains a in-memory copy of cluster metadata, and 

[jira] [Updated] (KAFKA-16145) Windows Kafka Shutdown

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16145:

Component/s: core
 (was: connect)

> Windows Kafka Shutdown
> --
>
> Key: KAFKA-16145
> URL: https://issues.apache.org/jira/browse/KAFKA-16145
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.0
> Environment: windows, openjdk-21, kafka_2.12-3.6.0
>Reporter: user017
>Priority: Major
> Fix For: 3.6.0
>
>
> ERROR Error while deleting segments for test.public.testtable-0 in dir 
> C:\tmp\kafka-logs 
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex -> 
> C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:414)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:291)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:982)
>         at 
> org.apache.kafka.storage.internals.log.AbstractIndex.renameTo(AbstractIndex.java:227)
>         at 
> org.apache.kafka.storage.internals.log.LazyIndex$IndexValue.renameTo(LazyIndex.java:122)
>         at 
> org.apache.kafka.storage.internals.log.LazyIndex.renameTo(LazyIndex.java:202)
>         at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:495)
>         at 
> kafka.log.LocalLog$.$anonfun$deleteSegmentFiles$1(LocalLog.scala:917)
>         at 
> kafka.log.LocalLog$.$anonfun$deleteSegmentFiles$1$adapted(LocalLog.scala:915)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at kafka.log.LocalLog$.deleteSegmentFiles(LocalLog.scala:915)
>         at kafka.log.LocalLog.removeAndDeleteSegments(LocalLog.scala:317)
>         at 
> kafka.log.UnifiedLog.$anonfun$deleteSegments$2(UnifiedLog.scala:1469)
>         at kafka.log.UnifiedLog.deleteSegments(UnifiedLog.scala:1845)
>         at 
> kafka.log.UnifiedLog.deleteRetentionMsBreachedSegments(UnifiedLog.scala:1443)
>         at kafka.log.UnifiedLog.deleteOldSegments(UnifiedLog.scala:1487)
>         at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:1282)
>         at 
> kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:1279)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at kafka.log.LogManager.cleanupLogs(LogManager.scala:1279)
>         at 
> kafka.log.LogManager.$anonfun$startupWithConfigOverrides$2(LogManager.scala:562)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
>         at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>         at java.base/java.lang.Thread.run(Thread.java:1583)
>         Suppressed: java.nio.file.FileSystemException: 
> C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex -> 
> C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process
>                 at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>                 at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>                 at 
> java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:328)
>                 at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:291)
>                 at java.base/java.nio.file.Files.move(Files.java:1430)
>                 at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:978)
>                 ... 25 more
>  
>  
> ERROR Shutdown broker because all log dirs in C:\tmp\kafka-logs have failed 
> (kafka.log.LogManager)
>  
> {color:#172b4d}*Run in administrator mode, no processes 

[jira] [Created] (KAFKA-16259) Immutable MetadataCache to improve client performance

2024-02-14 Thread Zhifeng Chen (Jira)
Zhifeng Chen created KAFKA-16259:


 Summary: Immutable MetadataCache to improve client performance
 Key: KAFKA-16259
 URL: https://issues.apache.org/jira/browse/KAFKA-16259
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.8.0
Reporter: Zhifeng Chen
 Attachments: image-2024-02-14-12-11-07-366.png

TL;DR, A Kafka client produce latency issue is identified caused by 
synchronized lock contention of metadata cache read/write in the native kafka 
producer.

Trigger Condition: A producer need to produce to large number of topics. such 
as in kafka rest-proxy

 

 

What is producer metadata cache

Kafka producer maintains a in-memory copy of cluster metadata, and it avoided 
fetch metadata every time when produce message to reduce latency

 

What’s the synchronized lock contention problem

Kafka producer metadata cache is a *mutable* object, read/write are isolated by 
a synchronized lock. Which means when the metadata cache is being updated, all 
read requests are blocked. 

Topic metadata expiration frequency increase liner with number of topics. In a 
kafka cluster with large number of topic partitions, topic metadata expiration 
and refresh triggers high frequent metadata update. When read operation blocked 
by update, producer threads are blocked and caused high produce latency issue.

 

*Proposed solution*

TL;DR Optimize performance of metadata cache read operation of native kafka 
producer with copy-on-write strategy

What is copy-on-write strategy

It’s a solution to reduce synchronized lock contention by making the object 
immutable, and always create a new instance when updating, but since the object 
is immutable, read operation will be free from waiting, thus produce latency 
reduced significantly

Besides performance, it can also make the metadata cache immutable from 
unexpected modification, reduce occurrence of code bugs due to incorrect 
synchronization 

 

Test result:

Environment: Kafka-rest-proxy

Client version: 2.8.0

Number of topic partitions: 250k

test result show 90%+ latency reduction on test cluster

!image-2024-02-14-12-11-07-366.png!

P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part 
show latency after the improvement, lower part show before improvement)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9790) Wrong metric bean name for connect worker metrics

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-9790:
---
Component/s: connect

> Wrong metric bean name for connect worker metrics
> -
>
> Key: KAFKA-9790
> URL: https://issues.apache.org/jira/browse/KAFKA-9790
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
> Environment: Tested with this docker image:  
> cnfldemos/cp-server-connect-datagen:0.2.0-5.4.0
>Reporter: Alexandre YANG
>Priority: Major
> Attachments: image-2020-03-31-15-11-51-031.png, 
> image-2020-03-31-15-16-37-573.png
>
>
> The connect worker metrics implemented here: 
>  * [https://github.com/apache/kafka/pull/6843/files]
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-475%3A+New+Metrics+to+Measure+Number+of+Tasks+on+a+Connector]
> should have this bean name:
>  
> {code:java}
> kafka.connect:type=connect-worker-metrics,connector="{connector}"
> {code}
> but it's currently (see screenshot)
>  
> {code:java}
> kafka.connect:type=connector-metrics,connector=datagen2
> {code}
>  
> !image-2020-03-31-15-11-51-031.png|width=673,height=371!
> !image-2020-03-31-15-16-37-573.png|width=688,height=458!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10735) Kafka producer producing corrupted avro values when confluent cluster is recreated and producer application is not restarted

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-10735.
-
Resolution: Cannot Reproduce

Hi, since this doesn't contain any Apache Kafka code, and doesn't provide any 
configurations, we are unable to reproduce this failure. Closing as cannot 
reproduce.

> Kafka producer producing corrupted avro values when confluent cluster is 
> recreated and producer application is not restarted
> 
>
> Key: KAFKA-10735
> URL: https://issues.apache.org/jira/browse/KAFKA-10735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tim Tattersall
>Priority: Major
>
> *Our Environment (AWS):*
> 1 x EC2 instance running 4 docker containers (using docker-compose)
>  * cp-kafka 5.5.1
>  * cp-zookeeper 5.5.1
>  * cp-schema-registry 5.5.1
>  * cp-enterprise-control-center 5.5.1
> 1 x ECS service running a single java application with spring-kafka producer
> Topics are using String key and Avro value
>  
> *Problem:*
>  * Avro values published after confluent cluster is recreated are corrupted. 
> Expecting Avro json structure, received string value with corrupted Avro 
> details
>  ** Expected: {"metadata":{"nabEventVersion":"1.0","type":"Kafka IBMMQ sink 
> connector","schemaUrl": ...*ongoing*
>  ** Actual: 1.08Kafka IBMMQ source 
> connector^kafka-conector-ibm-mq-source-entitlements-check\Kafka IBMMQ source 
> connector - sourced*ongoing*
>  
> *How to Reproduce*
>  # Using an existing confluent cluster
>  # Start a kafka producer java application (ours running with spring-kafka)
>  # Destroy the existing confluent cluster (using docker-compose down)
>  # Recreate the confluent cluster (using docker-compose up)
>  # Add the topic back onto the new cluster
>  # Trigger a message to be produced by the running Kafka producer
>  
> *Current Workaround*
>  * Killing running tasks on ECS service and allowing AWS to start new ones



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12309) The revocation algorithm produces uneven distributions

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-12309:

Component/s: connect

> The revocation algorithm produces uneven distributions
> --
>
> Key: KAFKA-12309
> URL: https://issues.apache.org/jira/browse/KAFKA-12309
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Assignments:
> "W0" -> 8 connectors/tasks
> "W1" -> 8 connectors/tasks
> (New) "W2" -> 0 connectors/tasks
> Revoked (trunk)
> "W0" -> 2 connectors/tasks
> "W1" -> 2 connectors/tasks
> Revoked (expected)
> "W0" -> 2 connectors/tasks
> "W1" -> 3 connectors/tasks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-10719:

Component/s: mirrormaker

> MirrorMaker2 fails to update its runtime configuration
> --
>
> Key: KAFKA-10719
> URL: https://issues.apache.org/jira/browse/KAFKA-10719
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Peter Sinoros-Szabo
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> I was running successfully the MM2 cluster with the following configuration, 
> I simplified it a little: {code:java} clusters = main, backup 
> main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 
> backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 
> main->backup.enabled = true main->backup.topics = .*{code} I wanted to change 
> the bootstrap.address list of the destination cluster to a different list 
> that is pointing to the *same* cluster, just a different listener with a 
> different routing. So I changed it to: {code:java} backup.bootstrap.servers = 
> backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart 
> on the MM2 nodes and say that some tasks were still using the old bootstrap 
> addresses, some of them were using the new one. I don't have the logs, so 
> unfortunately I don't know which one picked up the good values and which 
> didn't. I even stopped the cluster completely, but it didn't help. Ryanne 
> adviced to delete the mm2-config and mm2-status topics, so I did delete those 
> on the destination cluster, that solved this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-10719.
-
Fix Version/s: 3.7.0
   3.6.2
   Resolution: Fixed

> MirrorMaker2 fails to update its runtime configuration
> --
>
> Key: KAFKA-10719
> URL: https://issues.apache.org/jira/browse/KAFKA-10719
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Peter Sinoros-Szabo
>Priority: Major
> Fix For: 3.7.0, 3.6.2
>
>
> I was running successfully the MM2 cluster with the following configuration, 
> I simplified it a little: {code:java} clusters = main, backup 
> main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 
> backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 
> main->backup.enabled = true main->backup.topics = .*{code} I wanted to change 
> the bootstrap.address list of the destination cluster to a different list 
> that is pointing to the *same* cluster, just a different listener with a 
> different routing. So I changed it to: {code:java} backup.bootstrap.servers = 
> backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart 
> on the MM2 nodes and say that some tasks were still using the old bootstrap 
> addresses, some of them were using the new one. I don't have the logs, so 
> unfortunately I don't know which one picked up the good values and which 
> didn't. I even stopped the cluster completely, but it didn't help. Ryanne 
> adviced to delete the mm2-config and mm2-status topics, so I did delete those 
> on the destination cluster, that solved this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10266) Fix connector configs in docs to mention the correct default value inherited from worker configs

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-10266:

Component/s: connect

> Fix connector configs in docs to mention the correct default value inherited 
> from worker configs
> 
>
> Key: KAFKA-10266
> URL: https://issues.apache.org/jira/browse/KAFKA-10266
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Konstantine Karantasis
>Assignee: Luke Chen
>Priority: Major
>
>  
> Example: 
> [https://kafka.apache.org/documentation/#header.converter]
> has the correct default when it is mentioned as a worker property. 
> But under the section of source connector configs, it's default value is said 
> to be `null`. 
> Though that is correct in terms of implementation, it's confusing for users. 
> We should surface the correct defaults for configs that inherit (or otherwise 
> override) worker configs. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14773) Make MirrorMaker startup synchronous

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14773:

Component/s: mirrormaker

> Make MirrorMaker startup synchronous
> 
>
> Key: KAFKA-14773
> URL: https://issues.apache.org/jira/browse/KAFKA-14773
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: mirror-maker
>
> Currently, MirrorMaker is started using `
> ./bin/connect-mirror-maker.sh mm2.properties` shell command. However, even if 
> the shell command has exited and a log with `Kafka MirrorMaker started` has 
> been printed, it is likely that the underlying connectors and tasks have not 
> been configured.
> This tasks aims to make the MirrorMaker startup synchronous by either waiting 
> for connections & tasks to move to running state before exiting the 
> `MirrorMaker#start()` function or by blocking completion of `main()`.
> A conversation about this was done in 
> [https://github.com/apache/kafka/pull/13284] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13656) Connect Transforms support for nested structures

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-13656:

Component/s: connect

> Connect Transforms support for nested structures
> 
>
> Key: KAFKA-13656
> URL: https://issues.apache.org/jira/browse/KAFKA-13656
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: connect-transformation, needs-kip
>
> Single Message Transforms (SMT), 
> [KIP-66|https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect],
>  have greatly improved Connector's usability by enabling processing 
> input/output data without the need for additional streaming applications. 
> These benefits have been limited as most SMT implementations are limited to 
> fields available on the root structure:
>  * https://issues.apache.org/jira/browse/KAFKA-7624
>  * https://issues.apache.org/jira/browse/KAFKA-10640
> Therefore, this KIP is aimed to include support for nested structures on the 
> existing SMTs — where this make sense —, and to include the abstractions to 
> reuse this in future SMTs.
>  
> KIP: https://cwiki.apache.org/confluence/x/BafkCw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context (KIP-916)

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14652:

Component/s: mirrormaker

> Improve MM2 logging by adding the flow information to the context (KIP-916)
> ---
>
> Key: KAFKA-14652
> URL: https://issues.apache.org/jira/browse/KAFKA-14652
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> MirrorMaker2 runs multiple Connect worker instances in a single process. In 
> Connect, the logging is based on the assumption that Connector names are 
> unique. But in MM2, the same Connector names are being used in each flow 
> (Connect group). This means that there is no way to differentiate between the 
> logs of MirrorSourceConnector in A->B and in B->A.
> This can be improved by adding the flow to the logging context and the names 
> of the Connect framework threads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15892) Flaky test: testAlterSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15892:

Component/s: connect

> Flaky test: testAlterSinkConnectorOffsets – 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
> --
>
> Key: KAFKA-15892
> URL: https://issues.apache.org/jira/browse/KAFKA-15892
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> h4. Error
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> alter connector offsets. Error response: \{"error_code":500,"message":"Failed 
> to alter consumer group offsets for connector test-connector either because 
> its tasks haven't stopped completely yet or the connector was resumed before 
> the request to alter its offsets could be successfully completed. If the 
> connector is in a stopped state, this operation can be safely retried. If it 
> doesn't eventually succeed, the Connect cluster may need to be restarted to 
> get rid of the zombie sink tasks."}
> h4. Stacktrace
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> alter connector offsets. Error response: \{"error_code":500,"message":"Failed 
> to alter consumer group offsets for connector test-connector either because 
> its tasks haven't stopped completely yet or the connector was resumed before 
> the request to alter its offsets could be successfully completed. If the 
> connector is in a stopped state, this operation can be safely retried. If it 
> doesn't eventually succeed, the Connect cluster may need to be restarted to 
> get rid of the zombie sink tasks."}
>  at 
> app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.alterConnectorOffsets(EmbeddedConnect.java:614)
>  at 
> app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.alterConnectorOffsets(EmbeddedConnectCluster.java:48)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:363)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets(OffsetsApiIntegrationTest.java:287)
>  at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>  at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15914) Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - OffsetsApiIntegrationTest

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15914:

Component/s: connect

> Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - 
> OffsetsApiIntegrationTest
> ---
>
> Key: KAFKA-15914
> URL: https://issues.apache.org/jira/browse/KAFKA-15914
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Lucas Brutschy
>Priority: Major
>  Labels: flaky-test
>
> Test intermittently gives the following result:
> {code}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
>   at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917)
>   at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:396)
>   at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId(OffsetsApiIntegrationTest.java:297)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14823) Clean up ConfigProvider API

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14823:

Component/s: connect

> Clean up ConfigProvider API
> ---
>
> Key: KAFKA-14823
> URL: https://issues.apache.org/jira/browse/KAFKA-14823
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Mickael Maison
>Priority: Major
>
> The ConfigProvider interface exposes several methods that are not used:
> - ConfigData get(String path)
> - default void subscribe(String path, Set keys, ConfigChangeCallback 
> callback)
> - default void unsubscribe(String path, Set keys, 
> ConfigChangeCallback callback)
> - default void unsubscribeAll()
> We should either build mechanisms to support them or deprecate them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15891) Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15891:

Component/s: connect

> Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
> ---
>
> Key: KAFKA-15891
> URL: https://issues.apache.org/jira/browse/KAFKA-15891
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15918) Flaky test - OffsetsApiIntegrationTest.testResetSinkConnectorOffsets

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15918:

Component/s: connect

> Flaky test - OffsetsApiIntegrationTest.testResetSinkConnectorOffsets
> 
>
> Key: KAFKA-15918
> URL: https://issues.apache.org/jira/browse/KAFKA-15918
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Haruki Okada
>Priority: Major
>  Labels: flaky-test
> Attachments: stdout.log
>
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/]
>  
> {code:java}
> Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 3. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
> at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331)
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302)
> at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917)
> at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725)
> at 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:672)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
> at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
> at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 

[jira] [Updated] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-13756:

Component/s: connect

> Connect validate endpoint should return proper response for invalid connector 
> class
> ---
>
> Key: KAFKA-13756
> URL: https://issues.apache.org/jira/browse/KAFKA-13756
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> Currently, if there is an issue with  the connector class, the validate 
> endpoint returns a 400 or a 500 response.
> Instead, it should return a well formatted response containing a proper 
> validation error message.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-8314) Managing the doc field in case of schema projection - kafka connect

2024-02-14 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-8314:
---
Component/s: connect

> Managing the doc field in case of schema projection - kafka connect
> ---
>
> Key: KAFKA-8314
> URL: https://issues.apache.org/jira/browse/KAFKA-8314
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: kaushik srinivas
>Priority: Major
>
> Doc field change in the schema while writing to hdfs using hdfs sink 
> connector via connect framework would cause failures in schema projection.
>  
> java.lang.RuntimeException: 
> org.apache.kafka.connect.errors.SchemaProjectorException: Schema parameters 
> not equal. source parameters: \{connect.record.doc=xxx} and target 
> parameters: \{connect.record.doc=yyy} 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16258:
---
Labels: kip-848-client-support  (was: )

> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-02-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817500#comment-17817500
 ] 

Lianet Magrans edited comment on KAFKA-16008 at 2/14/24 7:25 PM:
-

[~kirktrue] I just added the links to the issue that most probably is 
causing/related to this failure


was (Author: JIRAUSER300183):
[~kirktrue] I just added the links to the issue that most probably is causing 
this failure

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs

2024-02-14 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817500#comment-17817500
 ] 

Lianet Magrans commented on KAFKA-16008:


[~kirktrue] I just added the links to the issue that most probably is causing 
this failure

> Fix PlaintextConsumerTest.testMaxPollIntervalMs
> ---
>
> Key: KAFKA-16008
> URL: https://issues.apache.org/jira/browse/KAFKA-16008
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is 
> failing when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>     at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
> at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
> at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16258:
---
Description: 
When the poll timer expires, the new consumer proactively leaves the group and 
clears its assignments, but it should also invoke the onPartitionsLost 
callback. The legacy coordinator does the following sequence on poll timer 
expiration: send leave group request 
([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
 invoke onPartitionsLost, and when it completes it clears the assignment 
(onJoinPrepare 
[here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).

This issue is most probably what is causing the failures in the integration 
tests that fail expecting callbacks when the poll interval expires (like 
https://issues.apache.org/jira/browse/KAFKA-16008)

  was:When the poll timer expires, the new consumer proactively leaves the 
group and clears its assignments, but it should also invoke the 
onPartitionsLost callback. The legacy coordinator does the following sequence 
on poll timer expiration: send leave group request 
([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
 invoke onPartitionsLost, and when it completes it clears the assignment 
(onJoinPrepare 
[here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).


> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Priority: Major
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-14 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16258:
--

 Summary: Stale member should trigger onPartitionsLost when leaving 
group
 Key: KAFKA-16258
 URL: https://issues.apache.org/jira/browse/KAFKA-16258
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
 Fix For: 3.8.0


When the poll timer expires, the new consumer proactively leaves the group and 
clears its assignments, but it should also invoke the onPartitionsLost 
callback. The legacy coordinator does the following sequence on poll timer 
expiration: send leave group request 
([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
 invoke onPartitionsLost, and when it completes it clears the assignment 
(onJoinPrepare 
[here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]

2024-02-14 Thread via GitHub


ahuang98 commented on PR #14206:
URL: https://github.com/apache/kafka/pull/14206#issuecomment-193746

   @mimaison @mumrah I moved unrelated test changes over to 
https://github.com/apache/kafka/pull/15373. The latest commit 
[1c1eb7d](https://github.com/apache/kafka/pull/15373/commits/1c1eb7d1a866c56188a97f6fa41940933e456c03)
 addresses the comments left here (mostly imports)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [DRAFT] [KAFKA-16069] Source Tasks re-transform records after Retriable exceptions [kafka]

2024-02-14 Thread via GitHub


wasniksudesh opened a new pull request, #15374:
URL: https://github.com/apache/kafka/pull/15374

   If producer.send() throws a retriableException, exactly one single record 
would've been transformed (but not delivered). 
   Simply storing SourceRecord and ProducerRecord (obtained after 
transformations and conversion) of this iteration should be enough. 
   
   In next "sendRecords()", this stored information is used to potentially skip 
re-transformation and re-conversion of this record. 
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Image test improvements [kafka]

2024-02-14 Thread via GitHub


ahuang98 opened a new pull request, #15373:
URL: https://github.com/apache/kafka/pull/15373

   More commit history and comments from 
https://github.com/apache/kafka/pull/14206 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817494#comment-17817494
 ] 

Greg Harris edited comment on KAFKA-13505 at 2/14/24 7:01 PM:
--

Hi [~twbecker] Thanks for bringing this issue to our attention. There's a 
couple of reasons this hasn't been addressed:
 * The ticket has been inactive, and other more active tickets are getting 
attention from contributors.
 * There appears to be a (third-party) workaround as described above by 
[~jcustenborder], and that repository appears to still be maintained and 
conduct releases.
 * The SchemaProjector is a utility class which is included in the connect API, 
but is not used by the framework anywhere, so only some users of some plugins 
experience this fault.
 * Enums are not a first-class object in the Connect Schema, and are instead 
implemented by plugins using so-called "Logical Types", which are a first-class 
type (e.g. STRING), combined with metadata that the plugins understand.

The last one is the most relevant: Connect is doing an "enum-unaware" 
comparison in the SchemaProjector, because to Connect, the enum is just a 
strange looking string. Rather than do something incorrect, the SchemaProjector 
declares that it does not know how to handle the type, and errors out.
What is needed is an "enum-aware" schema projector, which can only be 
implemented by a project that knows what the "enum" is. For Apache Kafka to 
solve this problem as-described, it would involve adding a special case for 
this one type, which is not a good solution.

An alternative would be to add a first-class ENUM type, but that would present 
a migration challenge for the existing enum infrastructure. Another alternative 
is to make SchemaProjector extensible to projecting logical types. This would 
allow the implementors of the ENUM type (or any higher type) to inform the 
SchemaProjector class of how it should perform projection. I've opened a ticket 
here: KAFKA-16257


was (Author: gharris1727):
Hi [~twbecker] Thanks for bringing this issue to our attention. There's a 
couple of reasons this hasn't been addressed:

* The ticket has been inactive, and other more active tickets are getting 
attention from contributors.
* There appears to be a (third-party) workaround as described above by 
[~jcustenborder], and that repository appears to still be maintained and 
conduct releases.
* The SchemaProjector is a utility class which is included in the connect API, 
but is not used by the framework anywhere, so only some users of some plugins 
experience this fault.
* Enums are not a first-class object in the Connect Schema, and are instead 
implemented by plugins using so-called "Logical Types", which are a first-class 
type (e.g. STRING), combined with metadata that the plugins understand.

The last one is the most relevant: Connect is doing an "enum-unaware" 
comparison in the SchemaProjector, because to Connect, the enum is just a 
strange looking string. Rather than do something incorrect, the SchemaProjector 
declares that it does not know how to handle the type, and errors out.
What is needed is an "enum-aware" schema projector, which can only be 
implemented by a project that knows what the "enum" is. For Apache Kafka to 
solve this problem as-described, it would involve adding a special case for 
this one type, which is not a good solution.

An alternative would be to add a first-class ENUM type, but that would present 
a migration challenge for the existing enum infrastructure. Another alternative 
is to deprecate and remove this implementation of SchemaProjector, and replace 
it with a SchemaProjector which is extensible to projecting logical types. This 
would allow the implementors of the ENUM type (or any higher type) to inform 
the SchemaProjector class of how it should perform projection. I've opened a 
ticket here: KAFKA-16257

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up 

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817494#comment-17817494
 ] 

Greg Harris commented on KAFKA-13505:
-

Hi [~twbecker] Thanks for bringing this issue to our attention. There's a 
couple of reasons this hasn't been addressed:

* The ticket has been inactive, and other more active tickets are getting 
attention from contributors.
* There appears to be a (third-party) workaround as described above by 
[~jcustenborder], and that repository appears to still be maintained and 
conduct releases.
* The SchemaProjector is a utility class which is included in the connect API, 
but is not used by the framework anywhere, so only some users of some plugins 
experience this fault.
* Enums are not a first-class object in the Connect Schema, and are instead 
implemented by plugins using so-called "Logical Types", which are a first-class 
type (e.g. STRING), combined with metadata that the plugins understand.

The last one is the most relevant: Connect is doing an "enum-unaware" 
comparison in the SchemaProjector, because to Connect, the enum is just a 
strange looking string. Rather than do something incorrect, the SchemaProjector 
declares that it does not know how to handle the type, and errors out.
What is needed is an "enum-aware" schema projector, which can only be 
implemented by a project that knows what the "enum" is. For Apache Kafka to 
solve this problem as-described, it would involve adding a special case for 
this one type, which is not a good solution.

An alternative would be to add a first-class ENUM type, but that would present 
a migration challenge for the existing enum infrastructure. Another alternative 
is to deprecate and remove this implementation of SchemaProjector, and replace 
it with a SchemaProjector which is extensible to projecting logical types. This 
would allow the implementors of the ENUM type (or any higher type) to inform 
the SchemaProjector class of how it should perform projection. I've opened a 
ticket here: KAFKA-16257

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> 

[jira] [Created] (KAFKA-16257) SchemaProjector should be extensible to logical types

2024-02-14 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16257:
---

 Summary: SchemaProjector should be extensible to logical types
 Key: KAFKA-16257
 URL: https://issues.apache.org/jira/browse/KAFKA-16257
 Project: Kafka
  Issue Type: New Feature
  Components: connect
Reporter: Greg Harris


The SchemaProjector currently only supports projecting primitive Number types, 
and cannot handle common logical types as have proliferated in the Connect 
ecosystem.

The SchemaProjector or a replacement should have the ability to extend it's 
functionality to support these logical types.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]

2024-02-14 Thread via GitHub


ahuang98 commented on PR #14206:
URL: https://github.com/apache/kafka/pull/14206#issuecomment-1944388740

   @mimaison the PR was originally meant to introduce additional test changes, 
I believe @mumrah renamed it after I added the migration fix. I'll move the 
tests out and apply your suggestions, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] 16069 - prevent re-transform of source-records after retriable exceptions [kafka]

2024-02-14 Thread via GitHub


wasniksudesh closed pull request #15140: 16069 - prevent re-transform of 
source-records after retriable exceptions
URL: https://github.com/apache/kafka/pull/15140


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update leaderAndEpoch before initializing metadata publishers [kafka]

2024-02-14 Thread via GitHub


mumrah commented on code in PR #15366:
URL: https://github.com/apache/kafka/pull/15366#discussion_r1489893040


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -197,13 +209,15 @@ private MetadataLoader(
 String threadNamePrefix,
 FaultHandler faultHandler,
 MetadataLoaderMetrics metrics,
-Supplier highWaterMarkAccessor
+Supplier highWaterMarkAccessor,
+Supplier leaderAndEpochAccessor
 ) {
 this.log = logContext.logger(MetadataLoader.class);
 this.time = time;
 this.faultHandler = faultHandler;
 this.metrics = metrics;
 this.highWaterMarkAccessor = highWaterMarkAccessor;
+this.leaderAndEpochAccessor = leaderAndEpochAccessor;

Review Comment:
   Should we call this "initialLeaderAndEpochAccessor" since we only read from 
it during startup? Otherwise it might be confusing that we have this supplier, 
but update the `currentLeaderAndEpoch` in a different way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-02-14 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894351


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -336,10 +342,14 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
 if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");
+if (handleBrokerUncleanShutdownHelper == null) {
+log.warn("No handleBrokerUncleanShutdownHelper provided");

Review Comment:
   Thanks for the advice! Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-02-14 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894070


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -780,4 +789,9 @@ public Entry> next() {
 }
 };
 }
+
+@FunctionalInterface
+interface HandleBrokerUncleanShutdownHelper {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-14 Thread via GitHub


RamanVerma commented on PR #14731:
URL: https://github.com/apache/kafka/pull/14731#issuecomment-1944354242

   > Hey @RamanVerma what is left here?
   
   We decided to bump the API version in the KIP. That and some testing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-14 Thread via GitHub


jolshan commented on PR #14731:
URL: https://github.com/apache/kafka/pull/14731#issuecomment-1944349380

   Hey @RamanVerma what is left here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489860692


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16256) Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol

2024-02-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16256:
-

Assignee: Kirk True

> Update ConsumerConfig to validate use of group.remote.assignor and 
> partition.assignment.strategy based on group.protocol
> 
>
> Key: KAFKA-16256
> URL: https://issues.apache.org/jira/browse/KAFKA-16256
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> {{ConsumerConfig}} supports both the {{group.remote.assignor}} and 
> {{partition.assignment.strategy}} configuration options. These, however, 
> should not be used together; the former is applicable only when the 
> {{group.protocol}} is set to {{consumer}} and the latter when the 
> {{group.protocol}} is set to {{{}classic{}}}. We should emit a warning if the 
> user specifies the incorrect configuration based on the value of 
> {{{}group.protocol{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16256) Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol

2024-02-14 Thread Kirk True (Jira)
Kirk True created KAFKA-16256:
-

 Summary: Update ConsumerConfig to validate use of 
group.remote.assignor and partition.assignment.strategy based on group.protocol
 Key: KAFKA-16256
 URL: https://issues.apache.org/jira/browse/KAFKA-16256
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
 Fix For: 3.8.0


{{ConsumerConfig}} supports both the {{group.remote.assignor}} and 
{{partition.assignment.strategy}} configuration options. These, however, should 
not be used together; the former is applicable only when the {{group.protocol}} 
is set to {{consumer}} and the latter when the {{group.protocol}} is set to 
{{{}classic{}}}. We should emit a warning if the user specifies the incorrect 
configuration based on the value of {{{}group.protocol{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489832782


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   That's a good call out, i will check other methods reading from this 
snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy

2024-02-14 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16255:
--
Description: The {{partition.assignment.strategy}} configuration is used to 
specify a list of zero or more {{ConsumerPartitionAssignor}} instances. 
However, that interface is not applicable for the KIP-848-based protocol on top 
of which {{AsyncKafkaConsumer}} is built. Therefore, the use of 
{{ConsumerPartitionAssignor}} is inappropriate and should be removed from 
{{{}AsyncKafkaConsumer{}}}.  (was: The partition.assignment.strategy 
configuration is used to specify a list of zero or more 
ConsumerPartitionAssignor instances. However, that interface is not applicable 
for the KIP-848-based protocol on top of which AsyncKafkaConsumer is built. 
Therefore, the use of ConsumerPartitionAssignor is in appropriate and should be 
removed.)

> AsyncKafkaConsumer should not use partition.assignment.strategy
> ---
>
> Key: KAFKA-16255
> URL: https://issues.apache.org/jira/browse/KAFKA-16255
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The {{partition.assignment.strategy}} configuration is used to specify a list 
> of zero or more {{ConsumerPartitionAssignor}} instances. However, that 
> interface is not applicable for the KIP-848-based protocol on top of which 
> {{AsyncKafkaConsumer}} is built. Therefore, the use of 
> {{ConsumerPartitionAssignor}} is inappropriate and should be removed from 
> {{{}AsyncKafkaConsumer{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy

2024-02-14 Thread Kirk True (Jira)
Kirk True created KAFKA-16255:
-

 Summary: AsyncKafkaConsumer should not use 
partition.assignment.strategy
 Key: KAFKA-16255
 URL: https://issues.apache.org/jira/browse/KAFKA-16255
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The partition.assignment.strategy configuration is used to specify a list of 
zero or more 
ConsumerPartitionAssignor instances. However, that interface is not applicable 
for the KIP-848-based protocol on top of which AsyncKafkaConsumer is built. 
Therefore, the use of ConsumerPartitionAssignor is in appropriate and should be 
removed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]

2024-02-14 Thread via GitHub


CalvinConfluent commented on PR #15359:
URL: https://github.com/apache/kafka/pull/15359#issuecomment-1944225912

   testIOExceptionDuringCheckpoint failure is irrelevant to the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16254) Allow MM2 to fully disable offset sync feature

2024-02-14 Thread Omnia Ibrahim (Jira)
Omnia Ibrahim created KAFKA-16254:
-

 Summary: Allow MM2 to fully disable offset sync feature
 Key: KAFKA-16254
 URL: https://issues.apache.org/jira/browse/KAFKA-16254
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.6.0, 3.5.0, 3.7.0
Reporter: Omnia Ibrahim
Assignee: Omnia Ibrahim


*Background:* 
At the moment syncing offsets feature in MM2 is broken to 2 parts
 # One is in `MirrorSourceTask` where we store the new recored's offset on 
target cluster to {{offset_syncs}} internal topic after mirroring the record. 
Before KAFKA-14610 in 3.5 MM2 used to just queue the offsets and publish them 
later but since 3.5 this behaviour changed we now publish any offset syncs that 
we've queued up, but have not yet been able to publish when 
`MirrorSourceTask.commit` get invoked. This introduced an over head to commit 
process.
 # The second part is in checkpoints source task where we use the new record 
offsets from {{offset_syncs}} and update {{checkpoints}} and 
{{__consumer_offsets}} topics.

*Problem:*
For customers who only use MM2 for mirroring data and not interested in syncing 
offsets feature they now can disable the second part of this feature which is 
by disabling {{emit.checkpoints.enabled}} and/or {{sync.group.offsets.enabled}} 
to disable emitting {{__consumer_offsets}} topic but nothing disabling 1st part 
of the feature. 

The problem get worse if they disabled MM2 from creating offset syncs internal 
topic as 
1. this will increase throughput as MM2 will try to force trying to update the 
offset with every mirrored batch which impacting the performance of our MM2.
2. Get too many error logs because they don't create the sync offset topic as 
they don't use the feature.

*Possible solution:*
Allow customers to fully disable the feature if they don't really need it 
similar to how we fully can disable other MM2 features like heartbeat feature 
by adding a new config.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-14 Thread via GitHub


mumrah commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1489713255


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {
+Map pendingTopics =
+topicNames.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)
+);
 
-@Override
-MetadataRequest.Builder createRequest(int timeoutMs) {
-if (supportsDisablingTopicCreation)
-return new MetadataRequest.Builder(new 
MetadataRequestData()
-
.setTopics(convertToMetadataRequestTopic(topicNamesList))
-.setAllowAutoTopicCreation(false)
-
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
-else
-return MetadataRequest.Builder.allTopics();
-}
+String partiallyFinishedTopicName = "";
+int partiallyFinishedTopicNextPartitionId = -1;
+TopicDescription partiallyFinishedTopicDescription = null;
 
-@Override
-void handleResponse(AbstractResponse abstractResponse) {
-MetadataResponse response = (MetadataResponse) 
abstractResponse;
-// Handle server responses for particular topics.
-Cluster cluster = response.buildCluster();
-Map errors = response.errors();
-for (Map.Entry> 
entry : topicFutures.entrySet()) {
-String topicName = entry.getKey();
-KafkaFutureImpl future = 
entry.getValue();
-Errors topicError = errors.get(topicName);
-if (topicError != null) {
-future.completeExceptionally(topicError.exception());
-continue;
+@Override
+Call generateCall() {
+return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+@Override
+DescribeTopicPartitionsRequest.Builder 
createRequest(int timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+
.setTopics(pendingTopics.values().stream().collect(Collectors.toList()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (!partiallyFinishedTopicName.isEmpty()) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+);
+}
+return new 
DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) 
{
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+String cursorTopicName = "";
+int cursorPartitionId = -1;
+if (response.data().nextCursor() != null) {
+DescribeTopicPartitionsResponseData.Cursor 
cursor = response.data().nextCursor();
+cursorTopicName = cursor.topicName();
+cursorPartitionId = cursor.partitionIndex();
+}
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = 
Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+
future.completeExceptionally(error.exception());
+ 

Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-02-14 Thread via GitHub


mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1489685922


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.metadata.Replicas.NONE;
+
+public class BrokersToElrs {
+private final SnapshotRegistry snapshotRegistry;
+
+// It maps from the broker id to the topic id partitions if the partition 
has ELR.
+private final TimelineHashMap> 
elrMembers;
+
+BrokersToElrs(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+
+/**
+ * Update our records of a partition's ELR.
+ *
+ * @param topicId   The topic ID of the partition.
+ * @param partitionId   The partition ID of the partition.
+ * @param prevElr   The previous ELR, or null if the partition is new.
+ * @param nextElr   The new ELR, or null if the partition is being 
removed.
+ */
+
+void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) {
+int[] prev;
+if (prevElr == null) {
+prev = NONE;
+} else {
+prev = Replicas.clone(prevElr);
+Arrays.sort(prev);
+}
+int[] next;
+if (nextElr == null) {
+next = NONE;
+} else {
+next = Replicas.clone(nextElr);
+Arrays.sort(next);
+}
+
+int i = 0, j = 0;
+while (true) {
+if (i == prev.length) {
+if (j == next.length) {
+break;
+}
+int newReplica = next[j];
+add(newReplica, topicId, partitionId);
+j++;
+} else if (j == next.length) {
+int prevReplica = prev[i];
+remove(prevReplica, topicId, partitionId);
+i++;
+} else {
+int prevReplica = prev[i];
+int newReplica = next[j];
+if (prevReplica < newReplica) {
+remove(prevReplica, topicId, partitionId);
+i++;
+} else if (prevReplica > newReplica) {
+add(newReplica, topicId, partitionId);
+j++;
+} else {
+i++;
+j++;
+}
+}
+}
+}
+
+void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
+Map topicMap = elrMembers.get(brokerId);
+if (topicMap != null) {
+topicMap.remove(topicId);
+}
+}
+
+private void add(int brokerId, Uuid topicId, int newPartition) {
+TimelineHashMap topicMap = elrMembers.get(brokerId);
+if (topicMap == null) {
+topicMap = new TimelineHashMap<>(snapshotRegistry, 0);
+elrMembers.put(brokerId, topicMap);
+}
+int[] partitions = topicMap.get(topicId);
+int[] newPartitions;
+if (partitions == null) {
+newPartitions = new int[1];
+} else {
+newPartitions = new int[partitions.length + 1];
+System.arraycopy(partitions, 0, newPartitions, 0, 
partitions.length);
+}
+newPartitions[newPartitions.length - 1] = newPartition;
+topicMap.put(topicId, newPartitions);
+}
+
+private void remove(int brokerId, Uuid topicId, int removedPartition) {
+TimelineHashMap topicMap = elrMembers.get(brokerId);
+if (topicMap == null) {
+throw new RuntimeException("Broker " + brokerId + " has no 
elrMembers " +
+"entry, so we can't remove " + topicId + ":" + 
removedPartition);
+}
+int[] partitions = topicMap.get(topicId);
+ 

[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition

2024-02-14 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817425#comment-17817425
 ] 

Omnia Ibrahim commented on KAFKA-16212:
---

Actually there is another proposal 4# which is to wait till we drop ZK before 
handling this Jira this will get us to simply replace TopicPartition by 
TopicIdPrtition as no were in the code we will have situation where topic id 
might be none. 

> Cache partitions by TopicIdPartition instead of TopicPartition
> --
>
> Key: KAFKA-16212
> URL: https://issues.apache.org/jira/browse/KAFKA-16212
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> From the discussion in [PR 
> 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it 
> would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of 
> {{TopicPartition}} to avoid ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]

2024-02-14 Thread via GitHub


mimaison commented on code in PR #14206:
URL: https://github.com/apache/kafka/pull/14206#discussion_r1489636848


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -226,8 +226,8 @@ class ZkConfigMigrationClient(
   val (migrationZkVersion, responses) = 
zkClient.retryMigrationRequestsUntilConnected(requests, state)
 
   if (responses.head.resultCode.equals(Code.NONODE)) {
-// Not fatal.
-error(s"Did not delete $configResource since the node did not exist.")
+// Not fatal. This is expected in the case this is a topic config and 
we delete the topic (KAFKA-16206)

Review Comment:
   We usually don't include mention to Jiras. The explanation seems sufficient. 



##
metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java:
##
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import java.util.Collections;

Review Comment:
   Can we move this with the other `java.util` imports?



##
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java:
##
@@ -64,6 +71,12 @@ public class ClusterImageTest {
 
 final static ClusterImage IMAGE2;
 
+static final List DELTA2_RECORDS;
+
+static final ClusterDelta DELTA2;
+
+final static ClusterImage IMAGE3;

Review Comment:
   nit: `static final` is the preferred syntax



##
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java:
##
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;

Review Comment:
   Can we move these imports with the other `org.apache.kafka.common.metadata` 
imports below?



##
metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java:
##
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import java.util.Collections;

Review Comment:
   Let's move this below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589: [2/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-14 Thread via GitHub


nizhikov commented on PR #15363:
URL: https://github.com/apache/kafka/pull/15363#issuecomment-1944040321

   Hello @showuon 
   
   Can you, please, take a look?
   I've checked CI results and it seems OK for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589: [3/3] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-02-14 Thread via GitHub


nizhikov commented on PR #15365:
URL: https://github.com/apache/kafka/pull/15365#issuecomment-1944038174

   Hello @showuon 
   
   Can you, please, take a look?
   I've checked CI results and it seems OK for me.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]

2024-02-14 Thread via GitHub


ijuma commented on code in PR #15263:
URL: https://github.com/apache/kafka/pull/15263#discussion_r1489643087


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition,
   private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
 targetLogDirectoryId match {
   case Some(directoryId) =>
-createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, 
isFutureReplica = false, highWatermarkCheckpoints, topicId, 
targetLogDirectoryId)
+if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
+  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId, targetLogDirectoryId)
+} else {
+  warn(s"Skipping creation of log because there are potentially 
offline log " +

Review Comment:
   What is the operator supposed to do when they see this warning? Generally, 
we should be very careful about warning logs - most times they are an 
anti-pattern as they are scary but not actionable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16155: Re-enable testAutoCommitIntercept [kafka]

2024-02-14 Thread via GitHub


lucasbru merged PR #15334:
URL: https://github.com/apache/kafka/pull/15334


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition

2024-02-14 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817412#comment-17817412
 ] 

Omnia Ibrahim commented on KAFKA-16212:
---

The moment the cache in `ReplicaManager.allPartitions` is represented as 
`Pool[TopicPartition, HostedPartition]` which is a wrapper around 
`ConcurrentHashMap[opicPartition, HostedPartition]` update this to use 
`TopicIdPartition` as a key turn out to be tricky as 
 # not all APIs that interact with `ReplicaManager` in order to fetch/update 
partition cache are aware of topicId like consumer coordinator, handling some 
requests in KafkaApi where the request schema doesn't have topicId, etc .
 # TopicId is represented as Optional in many places which means we might endup 
populate it with null or dummy uuid multiple times to construct 
TopicIdPartition. 



I have 3 proposals at the moment:
 * *Proposal #1 :* Update TopicIdPartitions to have constructor with topicId as 
optional. And change `ReplicaManager.allPartitions` to be 
`LinkedBlockingQueue[TopicIdPartition, HostedPartition]`. _*This might be the 
simplest one as far as I can see.*_ 
 ** any API that is not topic id aware will just get the last entry that match 
topicIdPartition.topicPartition.
 ** The code will need to make sure that we don't have duplicates by 
`TopicIdPartition` in the `LinkedBlockingQueue`.
 ** We will need to revert having topic Id as optional in TopicIdPartitions 
once everywhere in Kafka is topic-id aware.


 * *Proposal #2 :* change `ReplicaManager.allPartitions` to `new 
Pool[TopicPartition, LinkedBlockingQueue[(Option[Uuid], HostedPartition)]]` 
where `Option[Uuid]` represent topic id. This make the cache scheme bit 
complex. The proposal will 

 ** consider the last entry in `LinkedBlockingQueue` is the current value.
 ** The code will make sure that `LinkedBlockingQueue` has only entry for the 
same topic id 
 ** Topic Id aware APIs that need to fetch/update the partition will be updated 
to use `TopicPartition` and topic Id
 ** Topic Id non-aware APIs will remain using topic partitions and the 
replicaManager will assume that these APIs referring to the last entry in 
`LinkedBlockingQueue`


 * *Proposal#3:* The other option is to keep two separate caches one 
`Pool[TopicIdPartition, HostedPartition]` for partitions and another one 
`Pool[TopicPartition, Uuid]` for the last assigned topic id for each partition 
in order to form `TopicIdPartition`. This is the least favourite as having 2 
caches will risk that one of them can go out of data at any time.


[~jolshan] Do you have any strong preferences? I am leaning toward 1st as it is 
less messy than the others. WDYT?

> Cache partitions by TopicIdPartition instead of TopicPartition
> --
>
> Key: KAFKA-16212
> URL: https://issues.apache.org/jira/browse/KAFKA-16212
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Assignee: Omnia Ibrahim
>Priority: Major
>
> From the discussion in [PR 
> 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it 
> would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of 
> {{TopicPartition}} to avoid ambiguity.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


ijuma commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   Why is this synchronized? We should avoid synchronized for methods that are 
simply reading a snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


ijuma commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   Why is this synchronized? We should avoid synchronized for methods that are 
simply reading a snapshot. Also, are there other methods like this one that 
don't need to be synchronized?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-14 Thread via GitHub


lucasbru opened a new pull request, #15372:
URL: https://github.com/apache/kafka/pull/15372

   The consumer keeps a poll timer, which is used to ensure liveness of the 
application thread. The poll timer automatically updates while the 
`Consumer.poll(Duration)` method is blocked, while the newer consumer only 
updates the poll timer when a new call to `Consumer.poll(Duration)` is issued. 
This means that the kafka-console-consumer.sh tools, which uses a very long 
timeout by default, works differently with the new consumer, with the consumer 
proactively rejoining the group during long poll timeouts.
   
   This change solves the problem by (a) repeatedly sending 
`PollApplicationEvents` to the background thread, not just on the first call of 
`poll` and (b) making sure that the application thread doesn't block for so 
long that it runs out of `max.poll.interval`.
   
   An integration test is added to make sure that we do not rejoin the group 
when a long poll timeout is used with a low `max.poll.interval`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]

2024-02-14 Thread via GitHub


soarez commented on code in PR #15289:
URL: https://github.com/apache/kafka/pull/15289#discussion_r1489605994


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -137,7 +137,18 @@ object TestUtils extends Logging {
 val parentFile = new File(parent)
 parentFile.mkdirs()
 
-JTestUtils.tempDirectory(parentFile.toPath, null)
+val result = JTestUtils.tempDirectory(parentFile.toPath, null)
+
+parentFile.deleteOnExit()
+Exit.addShutdownHook("delete-temp-log-dir-on-exit", {
+  try {
+Utils.delete(parentFile)

Review Comment:
   Do we need both `parentFile.deleteOnExit()` and `Utils.delete(parentFile)` 
on the shutdown hook? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-14 Thread via GitHub


OmniaGM commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1489575856


##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
   def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, 
quorum: String): Unit = {
+if (isKRaftTest()) {
+  val value = configs.map(c => c.brokerId -> 
c.logDirs.contains(c.metadataLogDir))
+  logger.warn(s">> ${value.mkString(",")}")
+}

Review Comment:
   https://github.com/apache/kafka/pull/15371



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINIOR: Remove accidentally logs [kafka]

2024-02-14 Thread via GitHub


OmniaGM commented on PR #15371:
URL: https://github.com/apache/kafka/pull/15371#issuecomment-1943925566

   cc: @jolshan can you merge this when you have a moment please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINIOR: Remove accidentally logs [kafka]

2024-02-14 Thread via GitHub


OmniaGM opened a new pull request, #15371:
URL: https://github.com/apache/kafka/pull/15371

   This logs was added during the troubleshooting for some flaky tests and was 
pushed by mistake in #15354
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-14 Thread via GitHub


OmniaGM commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1489567388


##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
   def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, 
quorum: String): Unit = {
+if (isKRaftTest()) {
+  val value = configs.map(c => c.brokerId -> 
c.logDirs.contains(c.metadataLogDir))
+  logger.warn(s">> ${value.mkString(",")}")
+}

Review Comment:
   will raise a minor pr to remove it. I removed it but didn't push the commit 
ops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-14 Thread via GitHub


OmniaGM commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1489565890


##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
   def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, 
quorum: String): Unit = {
+if (isKRaftTest()) {
+  val value = configs.map(c => c.brokerId -> 
c.logDirs.contains(c.metadataLogDir))
+  logger.warn(s">> ${value.mkString(",")}")
+}

Review Comment:
   I thought I removed this one! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-02-14 Thread via GitHub


soarez commented on code in PR #15354:
URL: https://github.com/apache/kafka/pull/15354#discussion_r1489560264


##
core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala:
##
@@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness {
   }
 
   def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, 
quorum: String): Unit = {
+if (isKRaftTest()) {
+  val value = configs.map(c => c.brokerId -> 
c.logDirs.contains(c.metadataLogDir))
+  logger.warn(s">> ${value.mkString(",")}")
+}

Review Comment:
   Did you mean to include this change? The logging format and lack of 
description seems odd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix KafkaAdminClientTest.testClientInstanceId [kafka]

2024-02-14 Thread via GitHub


dajac opened a new pull request, #15370:
URL: https://github.com/apache/kafka/pull/15370

   This patch tries to address the 
[flakiness](https://ge.apache.org/scans/tests?search.rootProjectNames=kafka=Europe%2FZurich=org.apache.kafka.clients.admin.KafkaAdminClientTest=FLAKY=testClientInstanceId())
 of `KafkaAdminClientTest.testClientInstanceId`. The test fails with 
`org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.`. I believe that it is because 10ms is not enough when our CI is 
busy. Let's try with 1s.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817385#comment-17817385
 ] 

Tommy Becker commented on KAFKA-13505:
--

I'm wondering if this bug is being overlooked because it's described in terms 
of third party libraries like Confluent's S3 connector and Avro. But the issue 
is actually in the SchemaProjector class, which is part of Kafka Connect 
itself. Confluent's AvroConverter represents Avro enums in Connect Schema as 
Strings with "parameters" corresponding to each enum value. Even though these 
parameters seem like they are just intended to be metadata, 
SchemaProjector.checkMaybeCompatible() requires the parameters of each field to 
*exactly* match, so when values are added to an Avro enum, new parameters are 
added to the generated Connect Schema, breaking this test. Intuitively, it 
feels like this check could be less strict, but there is no specification for 
Connect Schema that I can find, so I don't know if requiring this parameter 
equality is correct or not.

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in 

[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-02-14 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16224:
---
Labels: client-transitions-issues kip-848-client-support  (was: 
kip-848-client-support)

> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (effectively considering 
> UnknownTopicOrPartitionException as non-retriable in this context) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16155: Re-enable testAutoCommitIntercept [kafka]

2024-02-14 Thread via GitHub


cadonna commented on code in PR #15334:
URL: https://github.com/apache/kafka/pull/15334#discussion_r1489442547


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -1378,6 +1377,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 // after rebalancing, we should have reset to the committed positions
 assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset)
 assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+
+// In both CLASSIC and CONSUMER protocols, interceptors are executed in 
poll and close.
+// However, in the CONSUMER protocol, the assignment may be changed 
outside of a poll, so
+// we need to poll once to ensure the interceptor is called.
+if (groupProtocol.toUpperCase == GroupProtocol.CONSUMER.name) {

Review Comment:
   I fine either way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

2024-02-14 Thread via GitHub


cadonna commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-1943728460

   @joobisb Could you please fix the compilation errors?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16247) replica keep out-of-sync after migrating broker to KRaft

2024-02-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16247.
---
Resolution: Fixed

Fixed in 3.7.0 RC4

> replica keep out-of-sync after migrating broker to KRaft
> 
>
> Key: KAFKA-16247
> URL: https://issues.apache.org/jira/browse/KAFKA-16247
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Priority: Major
> Attachments: KAFKA-16247.zip
>
>
> We are deploying 3 controllers and 3 brokers, and following the steps in 
> [doc|https://kafka.apache.org/documentation/#kraft_zk_migration]. When we're 
> moving from "Enabling the migration on the brokers" state to "Migrating 
> brokers to KRaft" state, the first rolled broker becomes out-of-sync and 
> never become in-sync. 
> From the log, we can see some "reject alterPartition" errors, but it just 
> happen 2 times. Theoretically, the leader should add the follower  into ISR 
> as long as the follower is fetching since we don't have client writing data. 
> But can't figure out why it didn't fetch. 
> Logs: https://gist.github.com/showuon/64c4dcecb238a317bdbdec8db17fd494
> ===
> update Feb. 14
> After further investigating the logs, I think the reason why the replica is 
> not added into ISR is because the alterPartition request got non-retriable 
> error from controller:
> {code:java}
> Failed to alter partition to PendingExpandIsr(newInSyncReplicaId=0, 
> sentLeaderAndIsr=LeaderAndIsr(leader=1, leaderEpoch=4, 
> isrWithBrokerEpoch=List(BrokerState(brokerId=1, brokerEpoch=-1), 
> BrokerState(brokerId=2, brokerEpoch=-1), BrokerState(brokerId=0, 
> brokerEpoch=-1)), leaderRecoveryState=RECOVERED, partitionEpoch=7), 
> leaderRecoveryState=RECOVERED, 
> lastCommittedState=CommittedPartitionState(isr=Set(1, 2), 
> leaderRecoveryState=RECOVERED)) because the partition epoch is invalid. 
> Partition state may be out of sync, awaiting new the latest metadata. 
> (kafka.cluster.Partition) 
> [zk-broker-1-to-controller-alter-partition-channel-manager]
> {code}
> Since it's a non-retriable error, we'll keep the state as pending, and 
> waiting for later leaderAndISR update as described 
> [here|https://github.com/apache/kafka/blob/d24abe0edebad37e554adea47408c3063037f744/core/src/main/scala/kafka/cluster/Partition.scala#L1876C1-L1876C41].
> Log analysis: https://gist.github.com/showuon/5514cbb995fc2ae6acd5858f69c137bb
> So the question becomes:
> 1. Why does the controller increase the partition epoch?
> 2. When the leader receives the leaderAndISR request from the controller, it 
> ignored the request because the leader epoch is identical, even though the 
> partition epoch is updated. Is the behavior expected? Will it impact the 
> alterPartition request later?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16061) KRaft JBOD follow-ups and improvements

2024-02-14 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-16061:
---

Assignee: Igor Soarez

> KRaft JBOD follow-ups and improvements
> --
>
> Key: KAFKA-16061
> URL: https://issues.apache.org/jira/browse/KAFKA-16061
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Igor Soarez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1943675980

   Went through the test failures across all jdk/scala combos, they are 
unrelated, and have been failing before this PR as well
   https://ge.apache.org/s/wftzjb3q6slyc/tests/overview?outcome=FAILED
   https://ge.apache.org/s/lyqs6eqs4mtny/tests/overview?outcome=FAILED
   https://ge.apache.org/s/x6x27oapk6qsa/tests/overview?outcome=FAILED
   https://ge.apache.org/s/sleegbh5pfyfo/tests/overview?outcome=FAILED 
   
   
   The test failure belong to test `kafka.server.LogDirFailureTest`, they are 
being fixed here https://issues.apache.org/jira/browse/KAFKA-16225
   
   @hachikuji I believe this good to be merged, what do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >