Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-02-19 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1951981784

   >  Replied in-line -- in the end I don't have a strong opinion -- if you 
think it's better to use grace=MAX (instead of just "large enough 150) and add 
a new tests method just for grace, I am also happy with it.
   
   @mjsax 
   Thank you for the feedback. I opted for 'large enough' and new test methods 
just for grace.
   
   I realized that if we only want to assert that late records get dropped and 
not look at the join result then we could even reuse the same test case for all 
three involved operators (inner, left, outer) as shown in 
`KStreamKStreamWindowCloseTest`.
   
   If you agree then I would remove the 
`.recordsArrivingPostWindowCloseShouldBeDropped()` from 
   `KStreamKStreamJoinTest, KStreamKStreamLeftJoinTest and 
KStreamKStreamOuterJoinTest`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]

2024-02-19 Thread via GitHub


jlprat commented on code in PR #15279:
URL: https://github.com/apache/kafka/pull/15279#discussion_r1494224391


##
core/src/main/scala/kafka/tools/ConsoleConsumer.scala:
##
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
+import java.util

Review Comment:
   Rebased against trunk. Feel free to review whenever you have time @mimaison. 
Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16009:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
> 
>
> Key: KAFKA-16009
> URL: https://issues.apache.org/jira/browse/KAFKA-16009
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, timeout
> Fix For: 3.8.0
>
>
> The integration test 
> {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing 
> when using the {{AsyncKafkaConsumer}}.
> The error is:
> {code}
> org.opentest4j.AssertionFailedError: Timed out before expected rebalance 
> completed
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at 
> kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317)
>   at 
> kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235)
> {code}
> The logs include this line:
>  
> {code}
> [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, 
> groupId=my-test] consumer poll timeout has expired. This means the time 
> between subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is spending 
> too much time processing messages. You can address this either by increasing 
> max.poll.interval.ms or by reducing the maximum size of batches returned in 
> poll() with max.poll.records. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188)
> {code} 
> I don't know if that's related or not.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16155) Investigate testAutoCommitIntercept

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16155:
---
Reporter: Lucas Brutschy  (was: Lucas Brutschy)

> Investigate testAutoCommitIntercept
> ---
>
> Key: KAFKA-16155
> URL: https://issues.apache.org/jira/browse/KAFKA-16155
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
> flakes on the the initial setup (before using interceptors, so interceptors 
> are unrelated here, except for being used later in the test).
> The problem is that we are seeking two topic partitions to offset 10 and 20, 
> respectively, but when we commit, we seem to have lost one of the offsets, 
> likely due to a race condition. 
> When I output `subscriptionState.allConsumed` repeatedly, I get this output:
> {noformat}
> allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}} 
> seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: 
> null)], epoch=0}} 
> seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: 
> null)], epoch=0}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, 
> metadata=''}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}}
> autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}}
> {noformat}
> So we after we seek to 10 / 20, we lose one of the offsets, maybe because we 
> haven't reconciled the assignment yet. Later, we get the second topic 
> partition assigned, but the offset is initialized to 0.
> We should investigate whether this can be made more like the behavior in the 
> original consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16243:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Idle kafka-console-consumer with new consumer group protocol preemptively 
> leaves group
> --
>
> Key: KAFKA-16243
> URL: https://issues.apache.org/jira/browse/KAFKA-16243
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Lucas Brutschy
>Priority: Critical
>
> Using the new consumer group protocol with kafka-console-consumer.sh, I find 
> that if I leave the consumer with no records to process for 5 minutes 
> (max.poll.interval.ms = 30ms), the tool logs the following warning 
> message and leaves the group.
> "consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records."
> With the older consumer, this did not occur.
> The reason is that the consumer keeps a poll timer which is used to ensure 
> liveness of the application thread. The poll timer automatically updates 
> while the `Consumer.poll(Duration)` method is blocked, while the newer 
> consumer only updates the poll timer when a new call to 
> `Consumer.poll(Duration)` is issued. This means that the 
> kafka-console-consumer.sh tools, which uses a very long timeout by default, 
> works differently with the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16155) Investigate testAutoCommitIntercept

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16155:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Investigate testAutoCommitIntercept
> ---
>
> Key: KAFKA-16155
> URL: https://issues.apache.org/jira/browse/KAFKA-16155
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept 
> flakes on the the initial setup (before using interceptors, so interceptors 
> are unrelated here, except for being used later in the test).
> The problem is that we are seeking two topic partitions to offset 10 and 20, 
> respectively, but when we commit, we seem to have lost one of the offsets, 
> likely due to a race condition. 
> When I output `subscriptionState.allConsumed` repeatedly, I get this output:
> {noformat}
> allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}} 
> seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: 
> null)], epoch=0}} 
> seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: 
> null)], epoch=0}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, 
> metadata=''}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}} 
> allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}}
> autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, 
> metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, 
> metadata=''}}
> {noformat}
> So we after we seek to 10 / 20, we lose one of the offsets, maybe because we 
> haven't reconciled the assignment yet. Later, we get the second topic 
> partition assigned, but the offset is initialized to 0.
> We should investigate whether this can be made more like the behavior in the 
> original consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16077:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Streams fails to close task after restoration when input partitions are 
> updated
> ---
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
>  Labels: streams
>
> There is a race condition in the state updater that can cause the following:
>  # We have an active task in the state updater
>  # We get fenced. We recreate the producer, transactions now uninitialized. 
> We ask the state updater to give back the task, add a pending action to close 
> the task clean once it’s handed back
>  # We get a new assignment with updated input partitions. The task is still 
> owned by the state updater, so we ask the state updater again to hand it back 
> and add a pending action to update its input partition
>  # The task is handed back by the state updater. We update its input 
> partitions but forget to close it clean (pending action was overwritten)
>  # Now the task is in an initialized state, but the underlying producer does 
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
>  Exception caught in process. taskId=1_0, 
> processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: 
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: 
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.k

[jira] [Assigned] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-9545:
-

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Lucas Brutschy
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-12935:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.4.0
>
> Attachments: 
> RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore[true].rtf
>
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14299) Benchmark and stabilize state updater

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14299:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Benchmark and stabilize state updater
> -
>
> Key: KAFKA-14299
> URL: https://issues.apache.org/jira/browse/KAFKA-14299
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> We need to benchmark and stabilize the separate state restoration code path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15319:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.6.0, 3.5.2
>
> Attachments: compat_report.html.zip
>
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14278:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
> ---
>
> Key: KAFKA-14278
> URL: https://issues.apache.org/jira/browse/KAFKA-14278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer , streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15865:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Ensure consumer.poll() execute autocommit callback
> --
>
> Key: KAFKA-15865
> URL: https://issues.apache.org/jira/browse/KAFKA-15865
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.7.0
>
>
> When the network thread completes autocommits, we need to send a 
> message/event to the application to notify the thread to execute the 
> callback.  In KAFKA-15327, the network thread sends a 
> AutoCommitCompletionBackgroundEvent to the polling thread.  The polling 
> thread should trigger the OffsetCommitCallback upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16220) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions is flaky

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16220:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  is flaky
> 
>
> Key: KAFKA-16220
> URL: https://issues.apache.org/jira/browse/KAFKA-16220
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: flaky, flaky-test
>
> This test has seen significant flakyness
>  
> https://ge.apache.org/s/fac7lploprvuu/tests/task/:streams:test/details/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest/shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()?top-execution=1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15798:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16097) State updater removes task without pending action in EOSv2

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16097:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> State updater removes task without pending action in EOSv2
> --
>
> Key: KAFKA-16097
> URL: https://issues.apache.org/jira/browse/KAFKA-16097
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> A long-running soak encountered the following exception:
>  
> {code:java}
> [2024-01-08 03:06:00,586] ERROR [i-081c089d2ed054443-StreamThread-3] Thread 
> encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> java.lang.IllegalStateException: Got a removed task 1_0 from the state 
> updater that is not for recycle, closing, or updating input partitions; this 
> should not happen
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:939)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:788)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> [2024-01-08 03:06:00,587] ERROR [i-081c089d2ed054443-StreamThread-3] 
> stream-client [i-081c089d2ed054443] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: Got a removed task 1_0 from the state 
> updater that is not for recycle, closing, or updating input partitions; this 
> should not happen
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> Caused by: java.lang.IllegalStateException: Got a removed task 1_0 from the 
> state updater that is not for recycle, closing, or updating input partitions; 
> this should not happen
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:939)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:788)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     ... 1 more{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15942) Implement ConsumerInterceptor

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15942:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Implement ConsumerInterceptor
> -
>
> Key: KAFKA-15942
> URL: https://issues.apache.org/jira/browse/KAFKA-15942
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Blocker
>  Labels: consumer-threading-refactor, interceptors
> Fix For: 3.8.0
>
>
> As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer
>  
> This is the current code. The implementation would be very similar
> {code:java}
> if (interceptors != null)
> interceptors.onCommit(offsets); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15326) Decouple Processing Thread from Polling Thread

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15326:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Decouple Processing Thread from Polling Thread
> --
>
> Key: KAFKA-15326
> URL: https://issues.apache.org/jira/browse/KAFKA-15326
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>
> As part of an ongoing effort to implement a better threading architecture in 
> Kafka streams, we decouple N stream threads into N polling threads and N 
> processing threads. The effort to consolidate N polling thread into a single 
> thread is follow-up after this ticket. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15803) Update last seen epoch during commit

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15803:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Update last seen epoch during commit
> 
>
> Key: KAFKA-15803
> URL: https://issues.apache.org/jira/browse/KAFKA-15803
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> At the time we implemented commitAsync in the prototypeAsyncConsumer, 
> metadata was not there. The ask here is to investigate if we need to add the 
> following function to the commit code:
>  
> private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, 
> OffsetAndMetadata offsetAndMetadata) {
> if (offsetAndMetadata != null)
> offsetAndMetadata.leaderEpoch().ifPresent(epoch -> 
> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14309) Kafka Streams upgrade tests do not cover for FK-joins

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14309:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Kafka Streams upgrade tests do not cover for FK-joins
> -
>
> Key: KAFKA-14309
> URL: https://issues.apache.org/jira/browse/KAFKA-14309
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> The current streams upgrade system test for FK joins inserts the production 
> of foreign key data and an actual foreign key join in every version of 
> SmokeTestDriver except for the latest. The effect was that FK join upgrades 
> are not tested at all, since no FK join code is executed after the bounce in 
> the system test.
> We should enable the FK-join code in the system test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14532) Correctly handle failed fetch when partitions unassigned

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14532:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Correctly handle failed fetch when partitions unassigned
> 
>
> Key: KAFKA-14532
> URL: https://issues.apache.org/jira/browse/KAFKA-14532
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
> Fix For: 3.4.0, 3.3.2
>
>
> On master, all our long-running test jobs are running into this exception: 
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition 
> stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 2 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370)
>  3 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.clearPreferredReadReplica(SubscriptionState.java:623)
>  4 at java.util.LinkedHashMap$LinkedKeySet.forEach(LinkedHashMap.java:559) 5 
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onFailure(Fetcher.java:349)
>  6 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:179)
>  7 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:149)
>  8 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:613)
>  9 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
>  10 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  11 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
>  12 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
>  13 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> 14 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> 15 at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450)
>  16 at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:910)
>  17 at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:773)
>  18 at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613)
>  19 at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
>  20[2022-12-13 04:01:59,024] ERROR [i-016cf5d2c1889c316-StreamThread-1] 
> stream-client [i-016cf5d2c1889c316] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams) 
> 21org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: No current assignment for partition 
> stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 22 at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:653)
>  23 at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
>  24Caused by: java.lang.IllegalStateException: No current assignment for 
> partition stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 25 
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370)
>  26 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.clearPreferredReadReplica(SubscriptionState.java:623)
>  27 at java.util.LinkedHashMap$LinkedKeySet.forEach(LinkedHashMap.java:559) 
> 28 at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onFailure(Fetcher.java:349)
>  29 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:179)
>  30 at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:149)
>  31 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:613)
>  32 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
>  33 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  34 at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
>  35 at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
>  36 at 
> org.apache.kafka.clients.co

[jira] [Assigned] (KAFKA-15690) EosIntegrationTest is flaky.

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15690:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> EosIntegrationTest is flaky.
> 
>
> Key: KAFKA-15690
> URL: https://issues.apache.org/jira/browse/KAFKA-15690
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Calvin Liu
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: flaky-test
>
> EosIntegrationTest
> shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
>  processing threads = false]
> {code:java}
> org.junit.runners.model.TestTimedOutException: test timed out after 600 
> seconds   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:)
> at 
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821)
>   at 
> org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779)
>at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
>   shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
> threads = false] 
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.   at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204)
>  at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286)
>at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174)
> at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
> shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing 
> threads = false] 
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> StreamsTasks did not request commit. ==> expected:  but was: 
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) 
> at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
> java.lang.IllegalStateException: Replica 
> [Topic=__transaction_state,Partition=2,Replica=1] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in OnlineReplica state

[jira] [Assigned] (KAFKA-15280) Implement client support for selecting KIP-848 server-side assignor

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15280:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Implement client support for selecting KIP-848 server-side assignor
> ---
>
> Key: KAFKA-15280
> URL: https://issues.apache.org/jira/browse/KAFKA-15280
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> This includes:
>  * Validate the client’s configuration for server-side assignor selection 
> defined in config `group.remote.assignor`
>  * Include the assignor taken from config in the {{ConsumerGroupHeartbeat}} 
> request, in the `ServerAssignor` field 
>  * Properly handle UNSUPPORTED_ASSIGNOR errors that may be returned in the HB 
> response if the server does not support the assignor defined by the consumer. 
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14415) ThreadCache is getting slower with every additional state store

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14415:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> ThreadCache is getting slower with every additional state store
> ---
>
> Key: KAFKA-14415
> URL: https://issues.apache.org/jira/browse/KAFKA-14415
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.4.0
>
>
> There are a few lines in `ThreadCache` that I think should be optimized. 
> `sizeBytes` is called at least once, and potentially many times in every 
> `put` and is linear in the number of caches (= number of state stores, so 
> typically proportional to number of tasks). That means, with every additional 
> task, every put gets a little slower.Compare the throughput of TIME_ROCKS on 
> trunk (green graph):
> [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/]
> This is the throughput of TIME_ROCKS is 20% higher when a constant time 
> `sizeBytes` implementation is used:
> [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/]
> The same seems to apply for the MEM backend (initial throughput >8000 instead 
> of 6000), however, I cannot run the same benchmark here because the memory is 
> filled too quickly.
> [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15832) Trigger client reconciliation based on manager poll

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15832:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Trigger client reconciliation based on manager poll
> ---
>
> Key: KAFKA-15832
> URL: https://issues.apache.org/jira/browse/KAFKA-15832
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, reconciliation
> Fix For: 3.8.0
>
>
> Currently the reconciliation logic on the client is triggered when a new 
> target assignment is received and resolved, or when new unresolved target 
> assignments are discovered in metadata.
> This could be improved by triggering the reconciliation logic on each poll 
> iteration, to reconcile whatever is ready to be reconciled. This would 
> require changes to support poll on the MembershipManager, and integrate it 
> with the current polling logic in the background thread. Receiving a new 
> target assignment from the broker, or resolving new topic names via a 
> metadata update could only ensure that the #assignmentReadyToReconcile is 
> properly updated (currently done), but wouldn't trigger the #reconcile() 
> logic, leaving that to the #poll() operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14530) Check state updater more than once in process loops

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14530:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Check state updater more than once in process loops
> ---
>
> Key: KAFKA-14530
> URL: https://issues.apache.org/jira/browse/KAFKA-14530
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Minor
> Fix For: 3.5.0
>
>
> In the new state restoration code, the state updater needs to be checked 
> regularly by the main thread to transfer ownership of tasks back to the main 
> thread once the state of the task is restored. The more often we check this, 
> the faster we can start processing the tasks.
> Currently, we only check the state updater once in every loop iteration of 
> the state updater. And while we couldn't observe this to be strictly not 
> often enough, we can increase the number of checks easily by moving the check 
> inside the inner processing loop. This would mean that once we have iterated 
> over `numIterations` records, we can already start processing tasks that have 
> finished restoration in the meantime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14432) RocksDBStore relies on finalizers to not leak memory

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14432:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> RocksDBStore relies on finalizers to not leak memory
> 
>
> Key: KAFKA-14432
> URL: https://issues.apache.org/jira/browse/KAFKA-14432
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
> Fix For: 3.4.0
>
>
> Relying on finalizers in RocksDB has been deprecated for a long time, and 
> starting with rocksdb 7, finalizers are removed completely (see 
> [https://github.com/facebook/rocksdb/pull/9523]). 
> Kafka Streams currently relies on finalizers in parts to not leak memory. 
> This needs to be resolved before we can upgrade to RocksDB 7.
> See  [https://github.com/apache/kafka/pull/12809] .
> This is a native heap profile after running Kafka Streams without finalizers 
> for a few hours:
> {code:java}
> Total: 13547.5 MB
> 12936.3 95.5% 95.5% 12936.3 95.5% rocksdb::port::cacheline_aligned_alloc
> 438.5 3.2% 98.7% 438.5 3.2% rocksdb::BlockFetcher::ReadBlockContents
> 84.0 0.6% 99.3% 84.2 0.6% rocksdb::Arena::AllocateNewBlock
> 45.9 0.3% 99.7% 45.9 0.3% prof_backtrace_impl
> 8.1 0.1% 99.7% 14.6 0.1% rocksdb::BlockBasedTable::PutDataBlockToCache
> 6.4 0.0% 99.8% 12941.4 95.5% Java_org_rocksdb_Statistics_newStatistics___3BJ
> 6.1 0.0% 99.8% 6.9 0.1% rocksdb::LRUCacheShard::Insert@2d8b20
> 5.1 0.0% 99.9% 6.5 0.0% rocksdb::VersionSet::ProcessManifestWrites
> 3.9 0.0% 99.9% 3.9 0.0% rocksdb::WritableFileWriter::WritableFileWriter
> 3.2 0.0% 99.9% 3.2 0.0% std::string::_Rep::_S_create{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-7000) KafkaConsumer.position should wait for assignment metadata

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-7000:
-

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> KafkaConsumer.position should wait for assignment metadata
> --
>
> Key: KAFKA-7000
> URL: https://issues.apache.org/jira/browse/KAFKA-7000
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: John Roesler
>Assignee: Lucas Brutschy
>Priority: Blocker
> Fix For: 2.0.0
>
>
> While updating Kafka Streams to stop using the deprecated 
> Consumer.poll(long), I found that this code unexpectedly throws an exception:
> {code:java}
> consumer.subscribe(topics);
> // consumer.poll(0); <- I've removed this line, which shouldn't be necessary 
> here.
> final Set partitions = new HashSet<>();
> for (final String topic : topics) {
> for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
> partitions.add(new TopicPartition(partition.topic(), 
> partition.partition()));
> }
> }
> for (final TopicPartition tp : partitions) {
> final long offset = consumer.position(tp);
> committedOffsets.put(tp, offset);
> }{code}
> Here is the exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: You can only 
> check the position for partitions assigned to this consumer.
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586)
>at 
> org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275)
>at 
> org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148)
>at 
> org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code}
>  
> As you can see in the commented code in my snippet, we used to block for 
> assignment with a poll(0), which is now deprecated.
> It seems reasonable to me for position() to do the same thing that poll() 
> does, which is call `coordinator.poll(timeout.toMillis())` early in 
> processing to ensure an up-to-date assignment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-13531:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Flaky test NamedTopologyIntegrationTest
> ---
>
> Key: KAFKA-13531
> URL: https://issues.apache.org/jira/browse/KAFKA-13531
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: flaky-test
> Attachments: 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout
>
>
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
> {quote}java.lang.AssertionError: Did not receive all 3 records from topic 
> output-stream-2 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <3> but: <0> was less than <3> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617)
>  at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote}
> STDERR
> {quote}java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) 
> at 
> org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122)
>  at 
> org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
>  Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: 
> Deleting offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting 
> offsets of a topic is forbidden while the consumer group is actively 
> subscribed to it. at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>  at 
> org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> jav

[jira] [Assigned] (KAFKA-15977) DelegationTokenEndToEndAuthorizationWithOwnerTest leaks threads

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15977:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> DelegationTokenEndToEndAuthorizationWithOwnerTest leaks threads
> ---
>
> Key: KAFKA-15977
> URL: https://issues.apache.org/jira/browse/KAFKA-15977
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14878/runs/8/nodes/11/steps/90/log/?start=0]
>  
> I had an unrelated PR fail with the following thread leak:
>  
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 95 > 
> DelegationTokenEndToEndAuthorizationWithOwnerTest > executionError STARTED
> kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.executionError 
> failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14878/core/build/reports/testOutput/kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.executionError.test.stdoutGradle
>  Test Run :core:test > Gradle Test Executor 95 > 
> DelegationTokenEndToEndAuthorizationWithOwnerTest > executionError FAILED
>     org.opentest4j.AssertionFailedError: Found 1 unexpected threads during 
> @AfterAll: `kafka-admin-client-thread | adminclient-483` ==> expected:  
> but was: {code}
>  
> All the following tests on that error fail with initialization errors, 
> because the admin client thread is never closed.
>  
> This is preceded by the following test failure:
>  
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 95 > 
> DelegationTokenEndToEndAuthorizationWithOwnerTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(String, boolean) > [1] 
> quorum=kraft, isIdempotenceEnabled=true FAILED
>     org.opentest4j.AssertionFailedError: expected acls:
>      (principal=User:scram-user2, host=*, operation=CREATE_TOKENS, 
> permissionType=ALLOW)
>      (principal=User:scram-user2, host=*, operation=DESCRIBE_TOKENS, 
> permissionType=ALLOW)
>     but got:
>  
>         at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>         at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>         at app//kafka.utils.TestUtils$.waitAndVerifyAcls(TestUtils.scala:1142)
>         at 
> app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.$anonfun$configureSecurityAfterServersStart$1(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:71)
>         at 
> app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.$anonfun$configureSecurityAfterServersStart$1$adapted(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:70)
>         at 
> app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         at 
> app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         at 
> app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:70){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15957:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14014:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14343) Write upgrade/downgrade tests for enabling the state updater

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14343:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Write upgrade/downgrade tests for enabling the state updater 
> -
>
> Key: KAFKA-14343
> URL: https://issues.apache.org/jira/browse/KAFKA-14343
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> Write a test that verifies the upgrade from a version of Streams with state 
> updater disabled to a version with state updater enabled and vice versa, so 
> that we can offer a save upgrade path.
>  * upgrade test from a version of Streams with state updater disabled to a 
> version with state updater enabled (probably a system test since the old code 
> path will be removed from the code base)
>  * downgrade test from a version of Streams with state updater enabled to a 
> version with state updater disabled (probably a system test since the old 
> code path will be removed from the code base)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-12679:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Rebalancing a restoring or running task may cause directory livelocking with 
> newly created task
> ---
>
> Key: KAFKA-12679
> URL: https://issues.apache.org/jira/browse/KAFKA-12679
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1
> Environment: Broker and client version 2.6.1
> Multi-node broker cluster
> Multi-node, auto scaling streams app instances
>Reporter: Peter Nahas
>Assignee: Lucas Brutschy
>Priority: Major
> Fix For: 3.7.0
>
> Attachments: Backoff-between-directory-lock-attempts.patch
>
>
> If a task that uses a state store is in the restoring state or in a running 
> state and the task gets rebalanced to a separate thread on the same instance, 
> the newly created task will attempt to lock the state store director while 
> the first thread is continuing to use it. This is totally normal and expected 
> behavior when the first thread is not yet aware of the rebalance. However, 
> that newly created task is effectively running a while loop with no backoff 
> waiting to lock the directory:
>  # TaskManager tells the task to restore in `tryToCompleteRestoration`
>  # The task attempts to lock the directory
>  # The lock attempt fails and throws a 
> `org.apache.kafka.streams.errors.LockException`
>  # TaskManager catches the exception, stops further processing on the task 
> and reports that not all tasks have restored
>  # The StreamThread `runLoop` continues to run.
> I've seen some documentation indicate that there is supposed to be a backoff 
> when this condition occurs, but there does not appear to be any in the code. 
> The result is that if this goes on for long enough, the lock-loop may 
> dominate CPU usage in the process and starve out the old stream thread task 
> processing.
>  
> When in this state, the DEBUG level logging for TaskManager will produce a 
> steady stream of messages like the following:
> {noformat}
> 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager   
>   : stream-thread [StreamThread-10] Could not initialize 0_34 due 
> to the following exception; will retry
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [StreamThread-10] standby-task [0_34] Failed to lock the state directory for 
> task 0_34
> {noformat}
>  
>  
> I've attached a git formatted patch to resolve the issue. Simply detect the 
> scenario and sleep for the backoff time in the appropriate StreamThread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15941:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: flaky-test
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14624) State restoration is broken with standby tasks and cache-enabled stores in processor API

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-14624:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> State restoration is broken with standby tasks and cache-enabled stores in 
> processor API
> 
>
> Key: KAFKA-14624
> URL: https://issues.apache.org/jira/browse/KAFKA-14624
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Balaji Rao
>Assignee: Lucas Brutschy
>Priority: Major
>
> I found that cache-enabled state stores in PAPI with standby tasks sometimes 
> returns stale data when a partition moves from one app instance to another 
> and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a 
> small project that I used to reproduce the issue.
> I dug around a bit and it seems like it's a bug in standby task state 
> restoration when caching is enabled. If a partition moves from instance 1 to 
> 2 and then back to instance 1,  since the `CachingKeyValueStore` doesn't 
> register a restore callback, it can return potentially stale data for 
> non-dirty keys. 
> I could fix the issue by modifying the `CachingKeyValueStore` to register a 
> restore callback in which the cache restored keys are added to the cache. Is 
> this fix in the right direction?
> {code:java}
> // register the store
> context.register(
> root,
> (RecordBatchingStateRestoreCallback) records -> {
> for (final ConsumerRecord record : 
> records) {
> put(Bytes.wrap(record.key()), record.value());
> }
> }
> );
> {code}
>  
> I would like to contribute a fix, if I can get some help!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15913:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
> 
>
> Key: KAFKA-15913
> URL: https://issues.apache.org/jira/browse/KAFKA-15913
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> ConsumerTestBuilder is meant to be an unit testing utility; however, we seem 
> to use Mockito#spy quite liberally.  This is not the right testing strategy 
> because we basically turn unit testing into integration testing.
>  
> While the current unit tests run fine, we should probably make the mocking 
> using Mockito#mock by default and test each dependency independently.
>  
> The ask here is
>  # Make mock(class) by default
>  # Provide more flexible interface for the testBuilder to allow user to 
> configure spy or mock.  Or, let user pass in their own mock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16198) Reconciliation may lose partitions when topic metadata is delayed

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16198:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Reconciliation may lose partitions when topic metadata is delayed
> -
>
> Key: KAFKA-16198
> URL: https://issues.apache.org/jira/browse/KAFKA-16198
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` 
> may lose part of the server-provided assignment when metadata is delayed. The 
> reason is incorrect handling of partially resolved topic names, as in this 
> example:
>  * We get assigned {{T1-1}} and {{T2-1}}
>  * We reconcile {{{}T1-1{}}}, {{T2-1}} remains in {{assignmentUnresolved}} 
> since the topic id {{T2}} is not known yet
>  * We get new cluster metadata, which includes {{{}T2{}}}, so {{T2-1}} is 
> moved to {{assignmentReadyToReconcile}}
>  * We call {{reconcile}} -- {{T2-1}} is now treated as the full assignment, 
> so {{T1-1}} is being revoked
>  * We end up with assignment {{T2-1, which is inconsistent with the 
> broker-side target assignment.}}
>  
> Generally, this seems to be a problem around semantics of the internal 
> collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence 
> of a topic in `assignmentReadyToReconcile` may mean either revocation of the 
> topic partition(s), or unavailability of a topic name for the topic.
> Internal state with simpler and correct invariants could be achieved by using 
> a single collection `currentTargetAssignment` which is based on topic IDs and 
> always corresponds to the latest assignment received from the broker. During 
> every attempted reconciliation, all topic IDs will be resolved from the local 
> cache, which should not introduce a lot of overhead. `assignmentUnresolved` 
> and `assignmentReadyToReconcile` are removed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15833) Restrict Consumer API to be used from one thread

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-15833:
--

Assignee: Lucas Brutschy  (was: Lucas Brutschy)

> Restrict Consumer API to be used from one thread
> 
>
> Key: KAFKA-15833
> URL: https://issues.apache.org/jira/browse/KAFKA-15833
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> The legacy consumer restricts the API to be used from one thread only. This 
> is not enforced in the new consumer. To avoid inconsistencies in the 
> behavior, we should enforce the same restriction in the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16253) kafka_server_alterPartition_metrics_network_io_total is not supported in kafka 3.5.1

2024-02-19 Thread Janardhana Gopalachar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818402#comment-17818402
 ] 

Janardhana Gopalachar commented on KAFKA-16253:
---

the metrics is available with type type=alter-partition-metrics hence closing 
this bug 
bash-4.4$ curl localhost:9404 | grep 
kafka_server_alter_partition_metrics_network_io_total
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0# 
HELP kafka_server_alter_partition_metrics_network_io_total The total number of 
network operations (reads or writes) on all connections 
kafka.server:name=null,type=alter-partition-metrics,attribute=network-io-total
# TYPE kafka_server_alter_partition_metrics_network_io_total untyped
kafka_server_alter_partition_metrics_network_io_total{BrokerId="0",} 18.0
100 1682k  100 1682k    0     0  3136k      0 --:--:-- --:--:-- --:--:-- 3132k
bash-4.4$ kafka-topics.sh --version
ERROR StatusLogger Reconfiguration failed: No configuration found for 
'5cb0d902' at 'null' in 'null'3.5.1 (Commit:2c6fb6c54472e90a)

> kafka_server_alterPartition_metrics_network_io_total is not supported in 
> kafka 3.5.1
> 
>
> Key: KAFKA-16253
> URL: https://issues.apache.org/jira/browse/KAFKA-16253
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Major
>
> Hi 
> The metrics  below was supported in kafka 3.2.3 but in 3.5.1 it is not sent . 
> I tried to search the source code to see if this is present, but couldnt find 
> a reference. 
> Is the below metrics deprecated or not supported. ?
> kafka_server_alterPartition_metrics_network_io_total
>  
> in kafka3.2.3 we could get the metrics, but in 3.5.1 it is not available
> HELP kafka_server_alterPartition_metrics_network_io_total The total number of 
> network operations (reads or writes) on all connections 
> kafka.server:name=null,type=alterPartition-metrics,attribute=network-io-total
> :--:# TYPE kafka_server_alterPartition_metrics_network_io_total untyped
> -kafka_server_alterPartition_metrics_network_io_total{BrokerId="0",} 10.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16253) kafka_server_alterPartition_metrics_network_io_total is not supported in kafka 3.5.1

2024-02-19 Thread Janardhana Gopalachar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Janardhana Gopalachar resolved KAFKA-16253.
---
Resolution: Invalid

> kafka_server_alterPartition_metrics_network_io_total is not supported in 
> kafka 3.5.1
> 
>
> Key: KAFKA-16253
> URL: https://issues.apache.org/jira/browse/KAFKA-16253
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Major
>
> Hi 
> The metrics  below was supported in kafka 3.2.3 but in 3.5.1 it is not sent . 
> I tried to search the source code to see if this is present, but couldnt find 
> a reference. 
> Is the below metrics deprecated or not supported. ?
> kafka_server_alterPartition_metrics_network_io_total
>  
> in kafka3.2.3 we could get the metrics, but in 3.5.1 it is not available
> HELP kafka_server_alterPartition_metrics_network_io_total The total number of 
> network operations (reads or writes) on all connections 
> kafka.server:name=null,type=alterPartition-metrics,attribute=network-io-total
> :--:# TYPE kafka_server_alterPartition_metrics_network_io_total untyped
> -kafka_server_alterPartition_metrics_network_io_total{BrokerId="0",} 10.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13567) adminClient exponential backoff implementation

2024-02-19 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13567.

Resolution: Duplicate

> adminClient exponential backoff implementation
> --
>
> Key: KAFKA-13567
> URL: https://issues.apache.org/jira/browse/KAFKA-13567
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13565) consumer exponential backoff implementation

2024-02-19 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13565.

Fix Version/s: 3.7.0
   Resolution: Duplicate

> consumer exponential backoff implementation
> ---
>
> Key: KAFKA-13565
> URL: https://issues.apache.org/jira/browse/KAFKA-13565
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13566) producer exponential backoff implementation

2024-02-19 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-13566.

Resolution: Duplicate

> producer exponential backoff implementation
> ---
>
> Key: KAFKA-13566
> URL: https://issues.apache.org/jira/browse/KAFKA-13566
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16230) Update verifiable_consumer.py to support KIP-848’s group protocol config

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16230:
---
Reviewer: Lucas Brutschy  (was: Lucas Brutschy)

> Update verifiable_consumer.py to support KIP-848’s group protocol config
> 
>
> Key: KAFKA-16230
> URL: https://issues.apache.org/jira/browse/KAFKA-16230
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update {{verifiable_consumer.py}} to support the 
> {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument. It will default to 
> classic and we will take a separate task (Jira) to update the callers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16104:
---
Reviewer: Lucas Brutschy  (was: Lucas Brutschy)

> Enable additional PlaintextConsumerTest tests for new consumer
> --
>
> Key: KAFKA-16104
> URL: https://issues.apache.org/jira/browse/KAFKA-16104
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Andrew Schofield
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests
> Fix For: 3.8.0
>
>
> It should be possible to enable:
>  * testAutoCommitOnClose
>  * testAutoCommitOnCloseAfterWakeup
>  * testExpandingTopicSubscriptions
>  * testShrinkingTopicSubscriptions
>  * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed)
>  * testMultiConsumerSessionTimeoutOnStopPolling
>  * testAutoCommitOnRebalance
>  * testPerPartitionLeadMetricsCleanUpWithSubscribe
>  * testPerPartitionLagMetricsCleanUpWithSubscribe
>  * testStaticConsumerDetectsNewPartitionCreatedAfterRestart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16167:
---
Reviewer: Lucas Brutschy  (was: Lucas Brutschy)

> Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
> --
>
> Key: KAFKA-16167
> URL: https://issues.apache.org/jira/browse/KAFKA-16167
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]

2024-02-19 Thread via GitHub


mimaison commented on PR #14206:
URL: https://github.com/apache/kafka/pull/14206#issuecomment-1952110096

   There's quite a few failures related to ZkMigration in the last CI run: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14206/22/testReport/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15140: improve TopicCommandIntegrationTest to be less flaky [kafka]

2024-02-19 Thread via GitHub


showuon merged PR #14891:
URL: https://github.com/apache/kafka/pull/14891


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15140) Improve TopicCommandIntegrationTest to be less flaky

2024-02-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-15140.
---
Fix Version/s: (was: 3.5.1)
   Resolution: Fixed

> Improve TopicCommandIntegrationTest to be less flaky
> 
>
> Key: KAFKA-15140
> URL: https://issues.apache.org/jira/browse/KAFKA-15140
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Minor
>  Labels: flaky-test, newbie
> Fix For: 3.8.0
>
>
> *This is a good Jira for folks who are new to contributing to Kafka.*
> Tests in TopicCommandIntegrationTest get flaky from time to time. The 
> objective of the task is to make them more robust by doing the following:
> 1. Replace the usage {-}createAndWaitTopic{-}() adminClient.createTopics() 
> method and other places where were are creating a topic (without waiting) 
> with 
> TestUtils.createTopicWithAdmin(). The latter method already contains the 
> functionality to create a topic and wait for metadata to sync up.
> 2. Replace the number 6 at places such as 
> "adminClient.createTopics(
> Collections.singletonList(new NewTopic("foo_bar", 1, 6.toShort)))" with a 
> meaningful constant.
> 3. Add logs if an assertion fails, for example, lines such as "
> assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), output)" should 
> have a third argument which prints the actual output printed so that we can 
> observe in the test logs on what was the output when assertion failed.
> 4. Replace occurrences of "\n" with System.lineSeparator() which is platform 
> independent
> 5. We should wait for reassignment to complete whenever we are re-assigning 
> partitions using alterconfig before we call describe to validate it. We could 
> use 
> TestUtils.waitForAllReassignmentsToComplete()
> *Motivation of this task*
> Try to fix the flaky test behaviour such as observed in 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13924/5/testReport/junit/kafka.admin/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderMinIsrPartitionsMixed_String__quorum_zk/]
>  
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
>   at 
> app//kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:794){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16259 Immutable MetadataCache to improve client performance [kafka]

2024-02-19 Thread via GitHub


msn-tldr commented on PR #15376:
URL: https://github.com/apache/kafka/pull/15376#issuecomment-1952339142

   @ericzhifengchen It seems you had a similar idea on creating immutable 
metadata cache on the client to improve latency :)
   
   I have created a follow-up https://github.com/apache/kafka/pull/15385 to add 
similar test to `testConcurrentUpdateAndGetCluster` in this PR. I can add you 
as a co-author to PR 15385, can you share the email with your github account? 
See steps on getting this information 
[here](https://docs.github.com/en/pull-requests/committing-changes-to-your-project/creating-and-editing-commits/creating-a-commit-with-multiple-authors#required-co-author-information)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Kafka 16084 Simplify and deduplicate standalone herder test mocking [kafka]

2024-02-19 Thread via GitHub


ahmedsobeh opened a new pull request, #15389:
URL: https://github.com/apache/kafka/pull/15389

   - Removed the connector field.
   - The class had a mix of Mock annotations and mock(Class) invocations, 
cleaned up one of them
   - The test doesn't stop the thread pool created inside the herder and might 
leak threads, added herder stopping in tearDown
   - Mocking for Worker#startConnector is 6 lines which are duplicated 8 times 
throughout the test, extracted to a method
   - Some waits are 1000 ms and others are 1000 s, and could be pulled out to 
constants or a util method. unified and set to a constant
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16084) Simplify and deduplicate StandaloneHerderTest mocking

2024-02-19 Thread Ahmed Sobeh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818185#comment-17818185
 ] 

Ahmed Sobeh edited comment on KAFKA-16084 at 2/19/24 12:53 PM:
---

-Hi- [~gharris1727]{-}! I'm almost done with this but I have one question, any 
specific recommendations for expectConfigValidation?{-}

had an idea, so submitted a PR


was (Author: JIRAUSER295920):
Hi [~gharris1727]! I'm almost done with this but I have one question, any 
specific recommendations for expectConfigValidation?

> Simplify and deduplicate StandaloneHerderTest mocking
> -
>
> Key: KAFKA-16084
> URL: https://issues.apache.org/jira/browse/KAFKA-16084
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Reporter: Greg Harris
>Assignee: Ahmed Sobeh
>Priority: Minor
>  Labels: newbie++
>
> The StandaloneHerderTest has some cruft that can be cleaned up. What i've 
> found:
> * The `connector` field is written in nearly every test, but only read by one 
> test, and looks to be nearly irrelevant.
> * `expectConfigValidation` has two ways of specifying consecutive 
> validations. 1. The boolean shouldCreateConnector which is true in the first 
> invocation and false in subsequent invocations. 2. by passing multiple 
> configurations via varargs.
> * The class uses a mix of Mock annotations and mock(Class) invocations
> * The test doesn't stop the thread pool created inside the herder and might 
> leak threads
> * Mocking for Worker#startConnector is 6 lines which are duplicated 8 times 
> throughout the test
> * Some waits are 1000 ms and others are 1000 s, and could be pulled out to 
> constants or a util method



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-02-19 Thread via GitHub


Phuc-Hong-Tran commented on PR #15188:
URL: https://github.com/apache/kafka/pull/15188#issuecomment-1952492560

   > > > @cadonna @lianetm, since we're supporting for both subscribe method 
using java.util.regex.Pattern and SubscriptionPattern, I think we should throw 
a illegal heartbeat exeption when user try to use both method at the same time 
and inform the user to use once at a time, since the field SubscribedRegex is 
used for java.util.regex.Pattern as well as SubscriptionPattern. What do you 
guys think?
   > > 
   > > 
   > > IMO, we must support the deprecated pattern subscriptions with 
`java.util.regex.Pattern` to ensure backwards compatibility, but we do not need 
to support mixed usage of `java.util.regex.Pattern` and Google regex patterns. 
I think this is a blind spot in the KIP. I propose to throw an 
`IllegalStateException` if `subscribe(java.util.regex.Pattern)` is called after 
`subscribe(SubscriptionPattern)` (and vice versa) without calling 
`unsubscribe()` in between. That is similar to the restrictions between 
pattern, topic, and partition subscriptions @lianetm linked above. I do not 
think it is worth to consider the edge case of mixed usage of the two pattern 
types. Does this make sense to you? \cc @dajac What do you as the original 
author of the KIP think? Should we update the KIP to make this clear?
   > 
   > @cadonna I would rather follow what we already do with `subscribe` today. 
The last one called takes precedence.
   
   I have a question. The subscribe method that use Pattern override the 
subscription with topic(s) that match the Pattern. When user choose to use 
SubscriptionPattern, but already used Pattern beforehand, should we clear out 
the old subscription?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-02-19 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818487#comment-17818487
 ] 

Lucas Brutschy commented on KAFKA-16167:


Looks like the test is still flaky.

 

IN an unrelated PR I got this:

 

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests

> Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
> --
>
> Key: KAFKA-16167
> URL: https://issues.apache.org/jira/browse/KAFKA-16167
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-02-19 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818487#comment-17818487
 ] 

Lucas Brutschy edited comment on KAFKA-16167 at 2/19/24 2:01 PM:
-

Looks like the test is still flaky.

 

IN an unrelated PR I got this:

 

[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests]

 

 

```
java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the 
return value of "java.util.Map.get(Object)" is null
 at 
kafka.api.PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup(PlaintextConsumerTest.scala:316)
```


was (Author: JIRAUSER302322):
Looks like the test is still flaky.

 

IN an unrelated PR I got this:

 

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests

> Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
> --
>
> Key: KAFKA-16167
> URL: https://issues.apache.org/jira/browse/KAFKA-16167
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup

2024-02-19 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reopened KAFKA-16167:


> Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
> --
>
> Key: KAFKA-16167
> URL: https://issues.apache.org/jira/browse/KAFKA-16167
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, integration-tests, 
> kip-848-client-support
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16258:
--

Assignee: Lianet Magrans

> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group

2024-02-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16258:
---
Labels: client-transitions-issues kip-848-client-support  (was: 
kip-848-client-support)

> Stale member should trigger onPartitionsLost when leaving group
> ---
>
> Key: KAFKA-16258
> URL: https://issues.apache.org/jira/browse/KAFKA-16258
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> When the poll timer expires, the new consumer proactively leaves the group 
> and clears its assignments, but it should also invoke the onPartitionsLost 
> callback. The legacy coordinator does the following sequence on poll timer 
> expiration: send leave group request 
> ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]),
>  invoke onPartitionsLost, and when it completes it clears the assignment 
> (onJoinPrepare 
> [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]).
> This issue is most probably what is causing the failures in the integration 
> tests that fail expecting callbacks when the poll interval expires (like 
> https://issues.apache.org/jira/browse/KAFKA-16008)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription

2024-02-19 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16261:
---
Labels: consumer-threading-refactor kip-848-client-support  (was: 
consumer-threading-refactor)

> MembershipManagerImpl.updateSubscription fails if already empty subscription
> 
>
> Key: KAFKA-16261
> URL: https://issues.apache.org/jira/browse/KAFKA-16261
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Andrew Schofield
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
>
> The internal SubscriptionState object keeps track of whether the assignment 
> is user-assigned, or auto-assigned. If there are no assigned partitions, the 
> assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed 
> in this state it fails.
> The easiest thing is perhaps to check 
> SubscriptionState.hasAutoAssignedPartitions() to make sure that 
> assignFromSubscribed is going to be permitted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15349) ducker-ak should fail fast when gradlew systemTestLibs fails

2024-02-19 Thread Ahmed Sobeh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Sobeh reassigned KAFKA-15349:
---

Assignee: Ahmed Sobeh

> ducker-ak should fail fast when gradlew systemTestLibs fails
> 
>
> Key: KAFKA-15349
> URL: https://issues.apache.org/jira/browse/KAFKA-15349
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Greg Harris
>Assignee: Ahmed Sobeh
>Priority: Minor
>  Labels: newbie++
>
> If you introduce a flaw into the gradle build which causes the systemTestLibs 
> to fail, such as a circular dependency, then the ducker_test function 
> continues to run tests which are invalid.
> Rather than proceeding to run the tests, the script should fail fast and make 
> the user address the error before continuing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16009: Fix PlaintextConsumerTest. testMaxPollIntervalMsDelayInRevocation [kafka]

2024-02-19 Thread via GitHub


lucasbru merged PR #15383:
URL: https://github.com/apache/kafka/pull/15383


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-19 Thread via GitHub


cadonna commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1494650399


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin
+consumer.poll(Duration.ofMillis(500))
+consumer.poll(Duration.ofMillis(500))
+
+// Check that we did not rejoin

Review Comment:
   Do we need this comment? I think it would be better to delete it and to 
rename `initialAssignedCalls` to something more meaningful like 
`callsToAssignedAfterFirstRebalance` or `callsToAssignedBeforePolls`.



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin

Review Comment:
   nit:
   This comment is a bit confusing. What is it supposed to clarify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]

2024-02-19 Thread via GitHub


ahmedsobeh opened a new pull request, #15390:
URL: https://github.com/apache/kafka/pull/15390

   In this modification, if ./gradlew systemTestLibs fails, the script will 
output an error message and terminate execution using the die function. This 
ensures that the script fails fast and prompts the user to address the error 
before continuing.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]

2024-02-19 Thread via GitHub


ahmedsobeh closed pull request #15390: KAFKA-15349 ducker-ak should fail fast 
when gradlew systemTestLibs fails
URL: https://github.com/apache/kafka/pull/15390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]

2024-02-19 Thread via GitHub


ahmedsobeh opened a new pull request, #15391:
URL: https://github.com/apache/kafka/pull/15391

   In this modification, if ./gradlew systemTestLibs fails, the script will 
output an error message and terminate execution using the die function. This 
ensures that the script fails fast and prompts the user to address the error 
before continuing.
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15349 ducker-ak should fail fast when gradlew systemTestLibs fails [kafka]

2024-02-19 Thread via GitHub


ahmedsobeh commented on PR #15390:
URL: https://github.com/apache/kafka/pull/15390#issuecomment-1952625573

   closing as the branch was incorrectly created


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16251: Fix for not sending heartbeat while fenced [kafka]

2024-02-19 Thread via GitHub


lianetm commented on PR #15392:
URL: https://github.com/apache/kafka/pull/15392#issuecomment-1952680928

   Hey @lucasbru, could you take a look to this small fix? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16251: Fix for not sending heartbeat while fenced [kafka]

2024-02-19 Thread via GitHub


lianetm commented on PR #15392:
URL: https://github.com/apache/kafka/pull/15392#issuecomment-1952684051

   Also @mjsax this is a very small one, closely related to the protocol, might 
be interesting.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]

2024-02-19 Thread via GitHub


jlprat commented on PR #15279:
URL: https://github.com/apache/kafka/pull/15279#issuecomment-1952754579

   Checked that all tests pass locally and are flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up core metrics, migration, network, raft, security, serializer, tools, utils, and zookeeper modules [kafka]

2024-02-19 Thread via GitHub


jlprat merged PR #15279:
URL: https://github.com/apache/kafka/pull/15279


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16266) Introduce TransactionLastUpdateTimeMs tagged field to DescribeTransactionsResponse

2024-02-19 Thread Ahmed Sobeh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Sobeh reassigned KAFKA-16266:
---

Assignee: Ahmed Sobeh

>  Introduce TransactionLastUpdateTimeMs tagged field to 
> DescribeTransactionsResponse
> ---
>
> Key: KAFKA-16266
> URL: https://issues.apache.org/jira/browse/KAFKA-16266
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Yang Yu
>Assignee: Ahmed Sobeh
>Priority: Major
>
> Introduce TransactionLastUpdateTimeMs tagged field to 
> DescribeTransactionsResponse. Make broker side changes to send this bit of 
> information. Also, make changes to `kafka-transactions.sh --describe` tooling 
> to display this new piece of information to the output.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-19 Thread via GitHub


hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494806542


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() {
 verify(sinkTask, times(4)).put(Collections.emptyList());
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testPreCommitFailureAfterPartialRevocationAndAssignment() {
+createTask(initialState);
+expectTaskGetTopic();
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+when(consumer.assignment())
+.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT)
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)));
+
+INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+// First poll; assignment is [TP1, TP2]
+when(consumer.poll(any(Duration.class)))
+.thenAnswer((Answer>) 
invocation -> {
+
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+return ConsumerRecords.empty();
+})
+// Second poll; a single record is delivered from TP1
+.thenAnswer(expectConsumerPoll(1))
+// Third poll; assignment changes to [TP2]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+return ConsumerRecords.empty();
+})
+// Fourth poll; assignment changes to [TP2, TP3]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+return ConsumerRecords.empty();
+})
+// Fifth poll; an offset commit takes place
+.thenAnswer(expectConsumerPoll(0));
+
+expectConversionAndTransformation(null, new RecordHeaders());
+
+// First iteration--first call to poll, first consumer assignment
+workerTask.iteration();
+// Second iteration--second call to poll, delivery of one record
+workerTask.iteration();
+// Third iteration--third call to poll, partial consumer revocation
+final Map offsets = new HashMap<>();
+offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+doAnswer(invocation -> null).when(consumer).commitSync(offsets);

Review Comment:
   good catch... updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-19 Thread via GitHub


hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494816048


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() {
 verify(sinkTask, times(4)).put(Collections.emptyList());
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testPreCommitFailureAfterPartialRevocationAndAssignment() {
+createTask(initialState);
+expectTaskGetTopic();
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+when(consumer.assignment())
+.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT)
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)));
+
+INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+// First poll; assignment is [TP1, TP2]
+when(consumer.poll(any(Duration.class)))
+.thenAnswer((Answer>) 
invocation -> {
+
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+return ConsumerRecords.empty();
+})
+// Second poll; a single record is delivered from TP1
+.thenAnswer(expectConsumerPoll(1))
+// Third poll; assignment changes to [TP2]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+return ConsumerRecords.empty();
+})
+// Fourth poll; assignment changes to [TP2, TP3]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+return ConsumerRecords.empty();
+})
+// Fifth poll; an offset commit takes place
+.thenAnswer(expectConsumerPoll(0));
+
+expectConversionAndTransformation(null, new RecordHeaders());
+
+// First iteration--first call to poll, first consumer assignment
+workerTask.iteration();
+// Second iteration--second call to poll, delivery of one record
+workerTask.iteration();
+// Third iteration--third call to poll, partial consumer revocation
+final Map offsets = new HashMap<>();
+offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+doAnswer(invocation -> null).when(consumer).commitSync(offsets);
+
+workerTask.iteration();
+verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+verify(sinkTask, times(2)).put(Collections.emptyList());
+
+// Fourth iteration--fourth call to poll, partial consumer assignment
+workerTask.iteration();
+
+verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+
+final Map workerCurrentOffsets = 
new HashMap<>();
+workerCurrentOffsets.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
+workerCurrentOffsets.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
+when(sinkTask.preCommit(workerCurrentOffsets)).thenThrow(new 
ConnectException("Failed to flush"));
+
+// Fifth iteration--task-requested offset commit with failure in 
SinkTask::preCommit
+sinkTaskContext.getValue().requestCommit();
+workerTask.iteration();
+
+verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET);
+verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET);
+}
+
+@Test
+public void testWakeupInCommitSyncCausesRetry() {
+createTask(initialState);
+
+workerTask.initialize(TASK_CONFIG);
+time.sleep(3L);
+workerTask.initializeAndStart();
+time.sleep(3L);
+verifyInitializeTask();
+
+expectTaskGetTopic();
+expectPollInitialAssignment()
+.thenAnswer(expectConsumerPoll(1))
+.thenAnswer(invocation -> {
+
rebalanceListener

[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-19 Thread Henrique Mota (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818530#comment-17818530
 ] 

Henrique Mota commented on KAFKA-15841:
---

Hello [~gharris1727]!

My use case is as follows:

1: I have multiple clients in each environment, with the largest having 90 
clients (databases). 2: Each client has a database in one application, and we 
replicate approximately 100 tables from this database to another application's 
database, with this other database being multi-tenant. 3: Previously, we had 
one topic per table, with some partitions for each topic. So, we needed to 
ensure that if any client had inconsistent data, we would pause the consumption 
for that client and continue processing data for other clients. Thus, we 
separated a topic with a partition for each table and client. We then created 
an extension of the JDBC Sink that can pause a problematic topic, and after 
some time attempt to resume consumption of the paused topic (we decided to use 
one topic per client instead of one partition per client to facilitate 
identification). 4: We have a JDBC Sink for each table. 5: We noticed that if 
we add more than one worker, in this scenario, all topics were assigned to 
worker 0, and the others were left waiting. 6: We tried to change the 'topics' 
property in the configurations using the 'taskConfigs(int maxTasks)' method, 
but Kafka Connect ignores this property when it is returned by 'taskConfigs(int 
maxTasks)'.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-19 Thread Henrique Mota (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818530#comment-17818530
 ] 

Henrique Mota edited comment on KAFKA-15841 at 2/19/24 4:41 PM:


Hello [~gharris1727]!

My use case is as follows:

1: I have multiple clients in each environment, with the largest having 90 
clients (databases). 

2: Each client has a database in one application, and we replicate 
approximately 100 tables from this database to another application's database, 
with this other database being multi-tenant. 

3: Previously, we had one topic per table, with some partitions for each topic. 
So, we needed to ensure that if any client had inconsistent data, we would 
pause the consumption for that client and continue processing data for other 
clients. Thus, we separated a topic with a partition for each table and client. 
We then created an extension of the JDBC Sink that can pause a problematic 
topic, and after some time attempt to resume consumption of the paused topic 
(we decided to use one topic per client instead of one partition per client to 
facilitate identification). 

4: We have a JDBC Sink for each table. 

5: We noticed that if we add more than one worker, in this scenario, all topics 
were assigned to worker 0, and the others were left waiting. 

6: We tried to change the 'topics' property in the configurations using the 
'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property 
when it is returned by 'taskConfigs(int maxTasks)'.


was (Author: henriquemota):
Hello [~gharris1727]!

My use case is as follows:

1: I have multiple clients in each environment, with the largest having 90 
clients (databases). 2: Each client has a database in one application, and we 
replicate approximately 100 tables from this database to another application's 
database, with this other database being multi-tenant. 3: Previously, we had 
one topic per table, with some partitions for each topic. So, we needed to 
ensure that if any client had inconsistent data, we would pause the consumption 
for that client and continue processing data for other clients. Thus, we 
separated a topic with a partition for each table and client. We then created 
an extension of the JDBC Sink that can pause a problematic topic, and after 
some time attempt to resume consumption of the paused topic (we decided to use 
one topic per client instead of one partition per client to facilitate 
identification). 4: We have a JDBC Sink for each table. 5: We noticed that if 
we add more than one worker, in this scenario, all topics were assigned to 
worker 0, and the others were left waiting. 6: We tried to change the 'topics' 
property in the configurations using the 'taskConfigs(int maxTasks)' method, 
but Kafka Connect ignores this property when it is returned by 'taskConfigs(int 
maxTasks)'.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-19 Thread via GitHub


hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494825277


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() {
 verify(sinkTask, times(4)).put(Collections.emptyList());
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testPreCommitFailureAfterPartialRevocationAndAssignment() {
+createTask(initialState);
+expectTaskGetTopic();
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+when(consumer.assignment())
+.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT)
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)));
+
+INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+// First poll; assignment is [TP1, TP2]
+when(consumer.poll(any(Duration.class)))
+.thenAnswer((Answer>) 
invocation -> {
+
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+return ConsumerRecords.empty();
+})
+// Second poll; a single record is delivered from TP1
+.thenAnswer(expectConsumerPoll(1))
+// Third poll; assignment changes to [TP2]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+return ConsumerRecords.empty();
+})
+// Fourth poll; assignment changes to [TP2, TP3]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+return ConsumerRecords.empty();
+})
+// Fifth poll; an offset commit takes place
+.thenAnswer(expectConsumerPoll(0));
+
+expectConversionAndTransformation(null, new RecordHeaders());
+
+// First iteration--first call to poll, first consumer assignment
+workerTask.iteration();
+// Second iteration--second call to poll, delivery of one record
+workerTask.iteration();
+// Third iteration--third call to poll, partial consumer revocation
+final Map offsets = new HashMap<>();
+offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+doAnswer(invocation -> null).when(consumer).commitSync(offsets);
+
+workerTask.iteration();
+verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+verify(sinkTask, times(2)).put(Collections.emptyList());
+
+// Fourth iteration--fourth call to poll, partial consumer assignment
+workerTask.iteration();
+
+verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+
+final Map workerCurrentOffsets = 
new HashMap<>();
+workerCurrentOffsets.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
+workerCurrentOffsets.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
+when(sinkTask.preCommit(workerCurrentOffsets)).thenThrow(new 
ConnectException("Failed to flush"));
+
+// Fifth iteration--task-requested offset commit with failure in 
SinkTask::preCommit
+sinkTaskContext.getValue().requestCommit();
+workerTask.iteration();
+
+verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET);
+verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET);
+}
+
+@Test
+public void testWakeupInCommitSyncCausesRetry() {
+createTask(initialState);
+
+workerTask.initialize(TASK_CONFIG);
+time.sleep(3L);
+workerTask.initializeAndStart();
+time.sleep(3L);
+verifyInitializeTask();
+
+expectTaskGetTopic();
+expectPollInitialAssignment()
+.thenAnswer(expectConsumerPoll(1))
+.thenAnswer(invocation -> {
+
rebalanceListener

[jira] [Updated] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-19 Thread Henrique Mota (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrique Mota updated KAFKA-15841:
--
Attachment: image-2024-02-19-13-48-55-875.png

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
> Attachments: image-2024-02-19-13-48-55-875.png
>
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-02-19 Thread Henrique Mota (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818535#comment-17818535
 ] 

Henrique Mota commented on KAFKA-15841:
---

That conditional basically prevented us from achieving what we wanted:

!image-2024-02-19-13-48-55-875.png!


The modification below would help us achieve our goal:

 

 
for (Map taskProps : taskConfigs) {
  // Ensure we don't modify the connector's copy of the config
  Map taskConfig = new HashMap<>(taskProps);
  taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
  if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG) {*}{color:#de350b}&& 
!taskConfig.containsKey(SinkTask.TOPICS_CONFIG){color}{*}) {
    taskConfig.put(SinkTask.TOPICS_CONFIG, 
connOriginals.get(SinkTask.TOPICS_CONFIG));
  }
  if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG) 
{*}{color:#de350b}&& !connOriginals.containsKey(SinkTask.taskConfig){color}{*}) 
{
    taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, 
connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
  }
  result.add(taskConfig);
}
 

 

 

 

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
> Attachments: image-2024-02-19-13-48-55-875.png
>
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]

2024-02-19 Thread via GitHub


lucasbru commented on code in PR #15372:
URL: https://github.com/apache/kafka/pull/15372#discussion_r1494847054


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin
+consumer.poll(Duration.ofMillis(500))
+consumer.poll(Duration.ofMillis(500))
+
+// Check that we did not rejoin

Review Comment:
   Done



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -265,6 +265,32 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 ensureNoRebalance(consumer, listener)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, 
groupProtocol: String): Unit = {
+
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
1000.toString)
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
500.toString)
+
+val consumer = createConsumer()
+val listener = new TestConsumerReassignmentListener
+consumer.subscribe(List(topic).asJava, listener)
+
+// rebalance to get the initial assignment
+awaitRebalance(consumer, listener)
+
+val initialAssignedCalls = listener.callsToAssigned
+
+consumer.poll(Duration.ofMillis(2000))
+
+// Give enough time to rejoin

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-19 Thread via GitHub


hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494861815


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -447,6 +457,85 @@ public void testPollRedelivery() {
 assertSinkMetricValue("offset-commit-completion-total", 1.0);
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testPollRedeliveryWithConsumerRebalance() {
+createTask(initialState);
+expectTaskGetTopic();
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+Set newAssignment = new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
+
+when(consumer.assignment())
+.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT, 
INITIAL_ASSIGNMENT)
+.thenReturn(newAssignment, newAssignment, newAssignment)
+.thenReturn(Collections.singleton(TOPIC_PARTITION3),
+Collections.singleton(TOPIC_PARTITION3),
+Collections.singleton(TOPIC_PARTITION3));
+
+INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+when(consumer.poll(any(Duration.class)))
+.thenAnswer((Answer>) 
invocation -> {
+
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+return ConsumerRecords.empty();
+})
+.thenAnswer(expectConsumerPoll(1))
+// Empty consumer poll (all partitions are paused) with 
rebalance; one new partition is assigned
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+return ConsumerRecords.empty();
+})
+.thenAnswer(expectConsumerPoll(0))
+// Non-empty consumer poll; all initially-assigned partitions 
are revoked in rebalance, and new partitions are allowed to resume
+.thenAnswer(invocation -> {
+ConsumerRecord newRecord = new 
ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET, RAW_KEY, RAW_VALUE);
+
+
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptyList());
+return new 
ConsumerRecords<>(Collections.singletonMap(TOPIC_PARTITION3, 
Collections.singletonList(newRecord)));

Review Comment:
   You're right, the test was a bit off mainly because the first call to 
`task.put(..)` shouldn't have thrown an exception.
   
   Here's the sequence now (which matches the original WorkerSinkTaskTest):
   
   1. Iteration#1: partitions on INITIAL_ASSIGNMENT are assigned
   2. Iteration#2: `task.put(...)` throws, partitions are paused
   3. Iteration#3: P3 is assigned, `task.put(...)` throws (task is already 
paused so it's a noop)
   4. Iteration#4: `task.put(...)` throws, noop
   5. Iteration#5: initial assignment is revoked (pending messages for P1 are 
removed); `consumer.poll(...)` returns a record for P3, which is successfully 
processed.
   
   I've added the corresponding verifications after each step. Hope it makes 
sense now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-19 Thread via GitHub


hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494896646


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##
@@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() {
 verify(sinkTask, times(4)).put(Collections.emptyList());
 }
 
+@Test
+@SuppressWarnings("unchecked")
+public void testPreCommitFailureAfterPartialRevocationAndAssignment() {
+createTask(initialState);
+expectTaskGetTopic();
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+verifyInitializeTask();
+
+when(consumer.assignment())
+.thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT)
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+.thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)));
+
+INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+// First poll; assignment is [TP1, TP2]
+when(consumer.poll(any(Duration.class)))
+.thenAnswer((Answer>) 
invocation -> {
+
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+return ConsumerRecords.empty();
+})
+// Second poll; a single record is delivered from TP1
+.thenAnswer(expectConsumerPoll(1))
+// Third poll; assignment changes to [TP2]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+return ConsumerRecords.empty();
+})
+// Fourth poll; assignment changes to [TP2, TP3]
+.thenAnswer(invocation -> {
+
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+return ConsumerRecords.empty();
+})
+// Fifth poll; an offset commit takes place
+.thenAnswer(expectConsumerPoll(0));
+
+expectConversionAndTransformation(null, new RecordHeaders());
+
+// First iteration--first call to poll, first consumer assignment
+workerTask.iteration();
+// Second iteration--second call to poll, delivery of one record
+workerTask.iteration();
+// Third iteration--third call to poll, partial consumer revocation
+final Map offsets = new HashMap<>();
+offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+doAnswer(invocation -> null).when(consumer).commitSync(offsets);
+
+workerTask.iteration();
+verify(sinkTask).close(Collections.singleton(TOPIC_PARTITION));
+verify(sinkTask, times(2)).put(Collections.emptyList());
+
+// Fourth iteration--fourth call to poll, partial consumer assignment
+workerTask.iteration();
+
+verify(sinkTask).open(Collections.singleton(TOPIC_PARTITION3));
+
+final Map workerCurrentOffsets = 
new HashMap<>();
+workerCurrentOffsets.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
+workerCurrentOffsets.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
+when(sinkTask.preCommit(workerCurrentOffsets)).thenThrow(new 
ConnectException("Failed to flush"));
+
+// Fifth iteration--task-requested offset commit with failure in 
SinkTask::preCommit
+sinkTaskContext.getValue().requestCommit();
+workerTask.iteration();
+
+verify(consumer).seek(TOPIC_PARTITION2, FIRST_OFFSET);
+verify(consumer).seek(TOPIC_PARTITION3, FIRST_OFFSET);
+}
+
+@Test
+public void testWakeupInCommitSyncCausesRetry() {
+createTask(initialState);
+
+workerTask.initialize(TASK_CONFIG);
+time.sleep(3L);
+workerTask.initializeAndStart();
+time.sleep(3L);
+verifyInitializeTask();
+
+expectTaskGetTopic();
+expectPollInitialAssignment()
+.thenAnswer(expectConsumerPoll(1))
+.thenAnswer(invocation -> {
+
rebalanceListener

Re: [PR] KAFKA-14683 Migrate WorkerSinkTaskTest to Mockito (3/3) [kafka]

2024-02-19 Thread via GitHub


hgeraldino commented on PR #15316:
URL: https://github.com/apache/kafka/pull/15316#issuecomment-1952966858

   Thanks @gharris1727 for your review. I think I addressed most of your 
comments.
   
If anyone has any other suggestions, please let me know.  Ideally we should 
rename this file back to `WorkerSinkTaskTest` (not sure if as part of this PR 
or a separate one) and have checkstyle, build.gradle, etc. cleaned up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-16278:


 Summary: Missing license for  scala related dependencies
 Key: KAFKA-16278
 URL: https://issues.apache.org/jira/browse/KAFKA-16278
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.1, 3.7.0
Reporter: Divij Vaidya


We are missing the license for following dependency in 
[https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
 

scala-collection-compat_2.12-2.10.0 is missing in license file
scala-java8-compat_2.12-1.0.2 is missing in license file
scala-library-2.12.18 is missing in license file
scala-logging_2.12-3.9.4 is missing in license file
scala-reflect-2.12.18 is missing in license file

The objective of this task is to add these dependencies in the LICENSE-binary 
file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12622) Automate LICENSE file validation

2024-02-19 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818559#comment-17818559
 ] 

Divij Vaidya commented on KAFKA-12622:
--

*Update for release managers*

Please check for correct licenses in both binaries (kafka_2.13 and kafka2.12).

> Automate LICENSE file validation
> 
>
> Key: KAFKA-12622
> URL: https://issues.apache.org/jira/browse/KAFKA-12622
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.8.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-12602, we manually constructed 
> a correct license file for 2.8.0. This file will certainly become wrong again 
> in later releases, so we need to write some kind of script to automate a 
> check.
> It crossed my mind to automate the generation of the file, but it seems to be 
> an intractable problem, considering that each dependency may change licenses, 
> may package license files, link to them from their poms, link to them from 
> their repos, etc. I've also found multiple URLs listed with various 
> delimiters, broken links that I have to chase down, etc.
> Therefore, it seems like the solution to aim for is simply: list all the jars 
> that we package, and print out a report of each jar that's extra or missing 
> vs. the ones in our `LICENSE-binary` file.
> The check should be part of the release script at least, if not part of the 
> regular build (so we keep it up to date as dependencies change).
>  
> Here's how I do this manually right now:
> {code:java}
> // build the binary artifacts
> $ ./gradlewAll releaseTarGz
> // unpack the binary artifact 
> $ tar xf core/build/distributions/kafka_2.13-X.Y.Z.tgz
> $ cd xf kafka_2.13-X.Y.Z
> // list the packaged jars 
> // (you can ignore the jars for our own modules, like kafka, kafka-clients, 
> etc.)
> $ ls libs/
> // cross check the jars with the packaged LICENSE
> // make sure all dependencies are listed with the right versions
> $ cat LICENSE
> // also double check all the mentioned license files are present
> $ ls licenses {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16278:
-
Description: 
We are missing the license for following dependency in 
[https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
 

scala-collection-compat_2.12-2.10.0 is missing in license file
scala-java8-compat_2.12-1.0.2 is missing in license file
scala-library-2.12.18 is missing in license file
scala-logging_2.12-3.9.4 is missing in license file
scala-reflect-2.12.18 is missing in license file

The objective of this task is to add these dependencies in the LICENSE-binary 
file.


(please backport to 3.6 and 3.7 branches)

  was:
We are missing the license for following dependency in 
[https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
 

scala-collection-compat_2.12-2.10.0 is missing in license file
scala-java8-compat_2.12-1.0.2 is missing in license file
scala-library-2.12.18 is missing in license file
scala-logging_2.12-3.9.4 is missing in license file
scala-reflect-2.12.18 is missing in license file

The objective of this task is to add these dependencies in the LICENSE-binary 
file.


> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: newbie
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16278:
-
Priority: Blocker  (was: Major)

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: newbie
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16278:
-
Fix Version/s: 3.6.2
   3.7.1

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: newbie
> Fix For: 3.6.2, 3.7.1
>
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16278:
-
Fix Version/s: 3.8.0

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: newbie
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Anton Liauchuk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818560#comment-17818560
 ] 

Anton Liauchuk commented on KAFKA-16278:


Can I pick this up?

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: newbie
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818562#comment-17818562
 ] 

Divij Vaidya commented on KAFKA-16278:
--

Sure [~anton.liauchuk] . You should be able to assign his ticket to yourself 
now.

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: newbie
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Anton Liauchuk (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Liauchuk reassigned KAFKA-16278:
--

Assignee: Anton Liauchuk

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Assignee: Anton Liauchuk
>Priority: Blocker
>  Labels: newbie
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16278) Missing license for scala related dependencies

2024-02-19 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818562#comment-17818562
 ] 

Divij Vaidya edited comment on KAFKA-16278 at 2/19/24 7:48 PM:
---

Sure [~anton.liauchuk] . You should be able to assign his ticket to yourself 
now.

P.S. - In future, feel free to assign any "unassigned" Jira ticket to yourself 
(by changing the Assignee) and start working on it. You don't have to ask for a 
permission.


was (Author: divijvaidya):
Sure [~anton.liauchuk] . You should be able to assign his ticket to yourself 
now.

> Missing license for  scala related dependencies
> ---
>
> Key: KAFKA-16278
> URL: https://issues.apache.org/jira/browse/KAFKA-16278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Divij Vaidya
>Assignee: Anton Liauchuk
>Priority: Blocker
>  Labels: newbie
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> We are missing the license for following dependency in 
> [https://github.com/apache/kafka/blob/b71999be95325f6ea54e925cbe5b426425781014/LICENSE-binary#L261]
>  
> scala-collection-compat_2.12-2.10.0 is missing in license file
> scala-java8-compat_2.12-1.0.2 is missing in license file
> scala-library-2.12.18 is missing in license file
> scala-logging_2.12-3.9.4 is missing in license file
> scala-reflect-2.12.18 is missing in license file
> The objective of this task is to add these dependencies in the LICENSE-binary 
> file.
> (please backport to 3.6 and 3.7 branches)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15467) Kafka broker returns offset out of range for topic/partitions on restart from unclean shutdown

2024-02-19 Thread Steve Jacobs (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818572#comment-17818572
 ] 

Steve Jacobs commented on KAFKA-15467:
--

This isn't a kafka or mm2 bug per se. This is all down to timing. Unclean 
shutdown forced no writes to disk on the single broker setup, so when the power 
came back, MM2 had consumed 'past the end of offsets on disk'. And since we 
were flushing to disk every minute, it only took 1 minute for offsets to 'catch 
up', making it appear like the offsets were on the broker when in fact they 
were not. 

[https://github.com/apache/kafka/pull/14567]

Should let me work around this issue when it merges, hopefully in kafka 3.7.0

 

> Kafka broker returns offset out of range for topic/partitions on restart from 
> unclean shutdown
> --
>
> Key: KAFKA-15467
> URL: https://issues.apache.org/jira/browse/KAFKA-15467
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 3.5.1
> Environment: Apache Kafka 3.5.1 with Strimzi on kubernetes.
>Reporter: Steve Jacobs
>Priority: Major
>
> So this started with me thinking this was a mirrormaker2 issue because here 
> are the symptoms I am seeing:
> I'm encountering an odd issue with mirrormaker2 with our remote replication 
> setup to high latency remote sites (satellite).
> Every few days we get several topics completely re-replicated, this appears 
> to happen after a network connectivity outage. It doesn't matter if it's a 
> long outage (hours) or a short one (minutes). And it only seems to affect a 
> few topics.
> I was finally able to track down some logs showing the issue. This was after 
> an hour-ish long outage where connectivity went down. There were lots of logs 
> about connection timeouts, etc. Here is the relevant part when the connection 
> came back up:
> {code:java}
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Disconnecting from node 0 due to socket connection setup timeout. The 
> timeout value is 63245 ms. (org.apache.kafka.clients.NetworkClient) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:45,380 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> [AdminClient 
> clientId=mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
>  Metadata update failed 
> (org.apache.kafka.clients.admin.internals.AdminMetadataManager) 
> [kafka-admin-client-thread | 
> mm2-admin-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector|replication-source-admin]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Disconnecting from node 0 due to socket connection setup 
> timeout. The timeout value is 52624 ms. 
> (org.apache.kafka.clients.NetworkClient) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,029 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Error sending fetch request (sessionId=460667411, 
> epoch=INITIAL) to node 0: (org.apache.kafka.clients.FetchSessionHandler) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> 2023-09-08 16:52:47,336 INFO [scbi->gcp.MirrorSourceConnector|worker] 
> refreshing topics took 67359 ms (org.apache.kafka.connect.mirror.Scheduler) 
> [Scheduler for MirrorSourceConnector: 
> scbi->gcp|scbi->gcp.MirrorSourceConnector-refreshing topics]
> 2023-09-08 16:52:48,413 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Fetch position FetchPosition{offset=4918131, 
> offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
>  (id: 0 rack: null)], epoch=0}} is out of range for partition 
> reading.sensor.hfp01sc-0, resetting offset 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch) 
> [task-thread-scbi->gcp.MirrorSourceConnector-1]
> (Repeats for 11 more topics)
> 2023-09-08 16:52:48,479 INFO [scbi->gcp.MirrorSourceConnector|task-1] 
> [Consumer 
> clientId=mm2-consumer-scbi|scbi->gcp|scbi->gcp.MirrorSourceConnector-1|replication-consumer,
>  groupId=null] Resetting offset for partition reading.sensor.hfp01sc-0 to 
> position FetchPosition{offset=3444977, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[kafka.scbi.eng.neoninternal.org:9094
> 

Re: [PR] KAFKA-16265: KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]

2024-02-19 Thread via GitHub


RamanVerma commented on code in PR #15384:
URL: https://github.com/apache/kafka/pull/15384#discussion_r1494984871


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -278,12 +278,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
   def handleListTransactions(
 filteredProducerIds: Set[Long],
-filteredStates: Set[String]
+filteredStates: Set[String],
+durationFilter: Long = -1

Review Comment:
   @yyu1993 this default value needs to be changed to -1L as well



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -61,6 +62,11 @@ public ListTransactionsOptions 
filterProducerIds(Collection producerIdFilt
 return this;
 }
 
+public ListTransactionsOptions durationFilter(long durationMs) {

Review Comment:
   Please add a comment to this method like we have for the other methods 
above. Also, we should probably change the method name to filterOnDuration, to 
match rest of the filter setting methods.



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {

Review Comment:
   Please add a Java doc comment to the method. Also, change the method name to 
`filteredDuration`



##
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java:
##
@@ -81,11 +87,16 @@ public Set filteredProducerIds() {
 return filteredProducerIds;
 }
 
+public long getDurationFilter() {
+return durationFilter;
+}
+
 @Override
 public String toString() {
 return "ListTransactionsOptions(" +
 "filteredStates=" + filteredStates +
 ", filteredProducerIds=" + filteredProducerIds +
+", durationFilter=" + durationFilter +

Review Comment:
   nit: durationFilter -> filteredDuration



##
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java:
##
@@ -187,14 +187,25 @@ private void testDescribeProducers(
 assertEquals(expectedRows, new HashSet<>(table.subList(1, 
table.size(;
 }
 
-@Test
-public void testListTransactions() throws Exception {
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testListTransactions(boolean hasDurationFilter) throws 
Exception {
 String[] args = new String[] {
 "--bootstrap-server",
 "localhost:9092",
 "list"
 };
 
+if (hasDurationFilter) {
+args = new String[] {
+"--bootstrap-server",
+"localhost:9092",
+"list",
+"--duration-filter",
+Long.toString(Long.MAX_VALUE)

Review Comment:
   hmm this will not return anything, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-19 Thread via GitHub


lucasbru commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1494961008


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration
 long commitStart = time.nanoseconds();
 try {
 Timer requestTimer = time.timer(timeout.toMillis());
-// Commit with a timer to control how long the request should be 
retried until it
-// gets a successful response or non-retriable error.
+// Commit with a retry timeout (the commit request will be retried 
until it gets a
+// successful response, non-retriable error, or the timeout 
expires)
 CompletableFuture commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));

Review Comment:
   While you are at it, I think we can remove the "isWakeupable" parameter and 
just set the  `wakeUpTrigger` in the calling context.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) {
 }
 
 CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-Optional expirationTimeMs = 
event.retryTimeoutMs().map(this::getExpirationTimeForTimeout);
-event.chain(manager.addOffsetCommitRequest(event.offsets(), 
expirationTimeMs, false));
+CompletableFuture commitResult;
+if (event.retryTimeoutMs().isPresent()) {

Review Comment:
   It's a bit weird that we use `retryTimeoutMs` to carry the information that 
this is a sync-commit vs asynccommit.
   
   How about going all-in here and just having `AsyncCommitApplicationEvent` 
and a `SyncCommitApplicationEvent`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pen

[jira] [Created] (KAFKA-16279) Avoid leaking abstractions of `StateStore`

2024-02-19 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16279:
---

 Summary: Avoid leaking abstractions of `StateStore`
 Key: KAFKA-16279
 URL: https://issues.apache.org/jira/browse/KAFKA-16279
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


The `StateStore` interface is user facing and contains a few life-cycle 
management methods (like `init()` and `close()`) – those methods are exposed 
for users to develop custom state stores.

However, we also use `StateStore` as base interface for store-handles in the 
PAPI, and thus life-cycle management methods are leaking into the PAPI (maybe 
also others – would need a dedicated discussion which one we consider useful 
for PAPI users and which not).

We should consider to change what we expose in the PAPI (atm, we only document 
via JavaDocs that eg. `close()` should never be called; but it's of course not 
ideal and would be better if `close()` et al. would not be expose for `PAPI` 
users to begin with.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16280) Expose method to determine Metric Measurability

2024-02-19 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16280:
-

 Summary: Expose method to determine Metric Measurability
 Key: KAFKA-16280
 URL: https://issues.apache.org/jira/browse/KAFKA-16280
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 3.8.0
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal
 Fix For: 3.8.0


The Jira is to track the development of KIP-1019: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group

2024-02-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818594#comment-17818594
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16277:


Hey [~credpath-seek] – first up, while it's been a while since I've looked at 
the sticky assignor code, I'm not too surprised that this might be the case. 
The obvious emphasis (per the name) was put on "stickiness" and 
partition-number balance, with good data parallelism ie topic-level balance 
being best-effort at most. 

That said, I suspect the assignor could be making a better effort. Presumably 
what is happening is that during the phase where it attempts to re-assign 
previously-owned partitions back to their former owner, we make a pass over a 
sorted list of previously-owned partitions that is grouped by topic. The 
assignor will then assign partitions from this list one-by-one to its previous 
owner until it hits the expected total number of partitions. So in the scenario 
you describe, it's basically looping over (t1p0, t1p1, t1p2, t1p3...t1pN, t2p0, 
t2p1, t2p2...t2pN) and assigning the first N partitions to the first consumer, 
which would be everything from topic 1, then just dumping the remaining 
partitions – all of which belong to topic 2 – onto the new consumer. 

The fix should be fairly simple – we just need to group this sorted list by 
partition, rather than by topic (ie t1p0, t2p0, t1p1, t2p1...t1pN, t2pN). Would 
you be interested in submitting a patch for this? 

As for what you can do right now: technically even if a fix for this was 
merged, you'd have to wait for the next release. However, the assignment is 
technically completely customizable, so in theory you could just copy/paste all 
the code from the patched assignor into a custom ConsumerPartitionAssignor 
implementation and then plug that in instead of the "cooperative-sticky" 
assignment strategy.

Otherwise, the workaround you suggest is a reasonable backup – with the obvious 
downside being that the two threads will have an unbalanced load between them, 
at least the overall node-level workload will be more even

> CooperativeStickyAssignor does not spread topics evenly among consumer group
> 
>
> Key: KAFKA-16277
> URL: https://issues.apache.org/jira/browse/KAFKA-16277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Cameron Redpath
>Priority: Major
> Attachments: image-2024-02-19-13-00-28-306.png
>
>
> Consider the following scenario:
> `topic-1`: 12 partitions
> `topic-2`: 12 partitions
>  
> Of note, `topic-1` gets approximately 10 times more messages through it than 
> `topic-2`. 
>  
> Both of these topics are consumed by a single application, single consumer 
> group, which scales under load. Each member of the consumer group subscribes 
> to both topics. The `partition.assignment.strategy` being used is 
> `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The 
> application may start with one consumer. It consumes all partitions from both 
> topics.
>  
> The problem begins when the application scales up to two consumers. What is 
> seen is that all partitions from `topic-1` go to one consumer, and all 
> partitions from `topic-2` go to the other consumer. In the case with one 
> topic receiving more messages than the other, this results in a very 
> imbalanced group where one consumer is receiving 10x the traffic of the other 
> due to partition assignment.
>  
> This is the issue being seen in our cluster at the moment. See this graph of 
> the number of messages being processed by each consumer as the group scales 
> from one to four consumers:
> !image-2024-02-19-13-00-28-306.png|width=537,height=612!
> Things to note from this graphic:
>  * With two consumers, the partitions for a topic all go to a single consumer 
> each
>  * With three consumers, the partitions for a topic are split between two 
> consumers each
>  * With four consumers, the partitions for a topic are split between three 
> consumers each
>  * The total number of messages being processed by each consumer in the group 
> is very imbalanced throughout the entire period
>  
> With regard to the number of _partitions_ being assigned to each consumer, 
> the group is balanced. However, the assignment appears to be biased so that 
> partitions from the same topic go to the same consumer. In our scenario, this 
> leads to very undesirable partition assignment.
>  
> I question if the behaviour of the assignor should be revised, so that each 
> topic has its partitions maximally spread across all available members of the 
> consumer group. In the above scenario, this would result in much more even 
> distribution of load. The behaviour would then be:
>  * With two 

[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics

2024-02-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818599#comment-17818599
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14597:


Hey [~atuljainiitk] I just came across this ticket for the first time, but for 
the most part I think your analysis makes sense. Are you still interested in 
picking this up? Specifically, reverting the linked change and getting the true 
system time for terminal nodes? It's been a while since that change was 
made/the KIP was implemented, so I don't remember all the context, but fetching 
the current time at terminal nodes sounds reasonable to me. Clearly the metric 
is otherwise useless, so we should either update it to be correct (and monitor 
for any potential performance impact) or just remove it entirely. And fixing it 
is obviously preferable, at least unless we know for a sure thing that it does 
hurt performance. I'm optimistic though

cc also [~talestonini] [~cadonna]

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> --
>
> Key: KAFKA-14597
> URL: https://issues.apache.org/jira/browse/KAFKA-14597
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Reporter: Atul Jain
>Assignee: Tales Tonini
>Priority: Major
> Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>static Topology buildTopology(String inputTopic, String outputTopic) {
> log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
> Serde stringSerde = Serdes.String();
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
> .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
> .mapValues(s -> {
> try {
> System.out.println("sleeping for 3 seconds");
> Thread.sleep(3000);
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> }
> return  s.toUpperCase();
> })
> .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
> .to(outputTopic, Produced.with(stringSerde, stringSerde));
> return builder.build();
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >