[jira] [Created] (KAFKA-14847) Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS
Guozhang Wang created KAFKA-14847: - Summary: Separate the callers of commitAllTasks v.s. commitTasks for EOS(-v2) and ALOS Key: KAFKA-14847 URL: https://issues.apache.org/jira/browse/KAFKA-14847 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today, EOS-v2/v1 and ALOS shares the same internal callpath inside TaskManager/TaskExecutor for committing tasks from various scenarios, the call path {{commitTasksAndMaybeUpdateCommitableOffsets}} -> {{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS, this is fine to commit just a subset of the tasks; however for EOS-v1, since all tasks participate in the same txn it could lead to dangerous violations, and today we are relying on all the callers of the commit function to make sure that the list of tasks they passed in, under EOS-v2, would still not violate the semantics. As summarized today (thanks to Matthias), today that callee could be triggered in the following cases: 1) Inside handleRevocation() -- this is a clean path, an we add all non-revoked tasks with commitNeeded() flag set to the commit -- so this seems to be fine. 2) tryCloseCleanAllActiveTasks() -- here we only call it, if tasksToCloseDirty.isEmpty() -- so it seems fine, too. 3) commit() with a list of task handed in -- we call commit() inside the TM three time 3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks) 3.b) inside maybeCommitActiveTasksPerUserRequested as commit(activeTaskIterable()); (passing in all tasks) 3.c) inside handleCorruption() -- here, we only consider RUNNING and RESTORING tasks, which are not corrupted -- note we only throw a TaskCorruptedException during restore state initialization, thus, corrupted tasks did not process anything yet, and all other tasks should be clean to be committed. 3.d) commitSuccessfullyProcessedTasks() -- under EOS-v2, as we just commit a subset of tasks' source offsets while at the same time we still commit those unsuccessful task's outgoing records if there are any. Just going through this list of callers itself, as demonstrated above, is already pretty complex, and very vulnerable to bugs. It's better to not rely on the callers, but the callees to make sure that's the case. More concretely, I think we can introduce a new function called {{commitAllTasks}} such that under EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are some tasks that should not be committed because we know they have not processed any data, the {{commitAllTasks}} callee itself would do some clever filtering internally. Given its scope, I think it's better to do this refactoring after EOS-v1 is removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12639) AbstractCoordinator ignores backoff timeout when joining the consumer group
[ https://issues.apache.org/jira/browse/KAFKA-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12639. --- Fix Version/s: 3.5.0 Resolution: Fixed > AbstractCoordinator ignores backoff timeout when joining the consumer group > --- > > Key: KAFKA-12639 > URL: https://issues.apache.org/jira/browse/KAFKA-12639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.7.0 >Reporter: Matiss Gutmanis >Assignee: Philip Nee >Priority: Major > Fix For: 3.5.0 > > > We observed heavy logging while trying to join consumer group during partial > unavailability of Kafka cluster (it's part of our testing process). Seems > that {{rebalanceConfig.retryBackoffMs}} used in {{ > org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeeded}} > is not respected. Debugging revealed that {{Timer}} instance technically is > expired thus using sleep of 0 milliseconds which defeats the purpose of > backoff timeout. > Minimal backoff timeout should be respected. > > {code:java} > 2021-03-30 08:30:24,488 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,488 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,488 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,489 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,489 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,489 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,490 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,490 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,490 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,491 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,491 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,491 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > 2021-03-30 08:30:24,492 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator > 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group. > 2021-03-30 08:30:24,492 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] Rebalance failed. > org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The > coordinator is loading and hence can't process requests. > 2021-03-30 08:30:24,492 INFO > [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer > clientId=app_clientid, groupId=consumer-group] (Re-)joining group > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs
[ https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-14253. --- Fix Version/s: 3.5.0 Resolution: Fixed > StreamsPartitionAssignor should print the member count in assignment logs > - > > Key: KAFKA-14253 > URL: https://issues.apache.org/jira/browse/KAFKA-14253 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Christopher Pooya Razavian >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.5.0 > > > Debugging rebalance and assignment issues is harder than it needs to be. One > simple thing that can help is to print out information in the logs that users > have to compute today. > For example, the StreamsPartitionAssignor prints two messages that contain > the the newline-delimited group membership: > {code:java} > [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread > [...-StreamThread-1-consumer] All members participating in this rebalance: > : [] > : [] > : []{code} > and > {code:java} > [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread > [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] > to clients as: > =[activeTasks: ([...]) standbyTasks: ([...])] > =[activeTasks: ([...]) standbyTasks: ([...])] > =[activeTasks: ([...]) standbyTasks: ([...]) > {code} > > In both of these cases, it would be nice to: > # Include the number of members in the group (I.e., "15 members > participating" and "to 15 clients as") > # sort the member ids (to help compare the membership and assignment across > rebalances) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14641) Cleanup CommitNeeded after EOS-V1 is removed
Guozhang Wang created KAFKA-14641: - Summary: Cleanup CommitNeeded after EOS-V1 is removed Key: KAFKA-14641 URL: https://issues.apache.org/jira/browse/KAFKA-14641 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang This is a follow-up of KAFKA-14294. Today we have several flags to determine if KS need to execute a commit: 1) task-level "commitNeeded" which is set whenever process() or punctuator() is called, 2) if there are input topic offsets to commit, retrieved from the "task.prepareCommit()", 3) the "transactionInFlight" flag from producer as a fix of KAFKA-14294 (this subsumes the first "commitNeeded" functionality). Given that we are still having EOS-v1, cleanup this would be a bit complex. But after the deprecated EOS-V1 is removed, we can cleanup those controls since for any commit cases, we would need to commit all tasks anyways whereas in EOS-v1, we would commit probably a subset of tasks since they are done by different producers and hence different txns. A quick thought is the following: 1) We would not need the per-task "commitNeeded" anymore. 2) We would maintain a single "commitNeeded" flag on the task-executor, hence on the thread level. It is set whenever `process()` or `punctuator` is called. 3) Whenever we need to commit, either a) periodically, b) upon revocation, c) upon user request, we simply check that flag, and if necessary commit all tasks and reset the flag. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14239) Merge StateRestorationIntegrationTest into RestoreIntegrationTest
Guozhang Wang created KAFKA-14239: - Summary: Merge StateRestorationIntegrationTest into RestoreIntegrationTest Key: KAFKA-14239 URL: https://issues.apache.org/jira/browse/KAFKA-14239 Project: Kafka Issue Type: Improvement Components: unit tests Reporter: Guozhang Wang We have two integration test classes for store restoration, and StateRestorationIntegrationTest only has one single test method. We can merge it with the other to save integration testing time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14130) Reduce RackAwarenessIntegrationTest to a unit test
[ https://issues.apache.org/jira/browse/KAFKA-14130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-14130. --- Fix Version/s: 3.4.0 Assignee: Guozhang Wang (was: Eslam) Resolution: Fixed > Reduce RackAwarenessIntegrationTest to a unit test > -- > > Key: KAFKA-14130 > URL: https://issues.apache.org/jira/browse/KAFKA-14130 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: newbie > Fix For: 3.4.0 > > > While working on KAFKA-13877, I feel it's an overkill to introduce the whole > test class as an integration test, since all we need is to just test the > assignor itself which could be a unit test. Running this suite with 9+ > instances takes long time and is still vulnerable to all kinds of timing > based flakiness. A better choice is to reduce it as a unit test, similar to > {{HighAvailabilityStreamsPartitionAssignorTest}} that just test the behavior > of the assignor itself, rather than creating many instances hence depend on > various timing bombs to not explode. > The scope of this ticket is to refactor the {{RackAwarenessIntegrationTest}} > into a {{RackAwarenessStreamsPartitionAssignorTest}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent
Guozhang Wang created KAFKA-14138: - Summary: The Exception Throwing Behavior of Transactional Producer is Inconsistent Key: KAFKA-14138 URL: https://issues.apache.org/jira/browse/KAFKA-14138 Project: Kafka Issue Type: Improvement Components: producer Reporter: Guozhang Wang There's an issue for inconsistent error throwing inside Kafka Producer when transactions are enabled. In short, there are two places where the received error code from the brokers would be eventually thrown to the caller: * Recorded on the batch's metadata, via "Sender#failBatch" * Recorded on the txn manager, via "txnManager#handleFailedBatch". The former would be thrown from 1) the `Future` returned from the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, the latter would be thrown from `producer.send()` directly in which we call `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown from the former, it's not wrapped hence the direct exception (e.g. ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. KafkaException(ClusterAuthorizationException). And which one would be thrown depend on a race condition since we cannot control by the time the caller thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's error has been sent back or not. For example consider the following sequence: 1. caller thread: within future = producer.send(), call recordAccumulator.append 2. sender thread: drain the accumulator, send the produceRequest and get the error back. 3. caller thread: within future = producer.send(), call txnManager.maybeAddPartition 4. sender thread: get the addPartition token, send the txnRequest and get the error back. NOTE the sender thread could send these two requests in any order. 5. caller thread: future.get() In a sequence where then 3) happened before 2), we would only get the raw exception at step 5; in a sequence where 2) happened before 3), then we would throw the exception immediately at 3). This inconsistent error throwing is pretty annoying for users since they'd need to handle both cases, but many of them actually do not know this trickiness. We should make the error throwing consistent, e.g. we should consider: 1) which errors would be thrown from callback / future.get, and which would be thrown from the `send` call directly, and these errors should better be non-overlapping, 2) whether we should wrap the raw error or not, we should do so consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
[ https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13877. --- Resolution: Fixed > Flaky > RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags > > > Key: KAFKA-13877 > URL: https://issues.apache.org/jira/browse/KAFKA-13877 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: newbie > > The following test fails on local testbeds about once per 10-15 runs: > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:87) > at org.junit.Assert.assertTrue(Assert.java:42) > at org.junit.Assert.assertTrue(Assert.java:53) > at > org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14130) Reduce RackAwarenessIntegrationTest to a unit test
Guozhang Wang created KAFKA-14130: - Summary: Reduce RackAwarenessIntegrationTest to a unit test Key: KAFKA-14130 URL: https://issues.apache.org/jira/browse/KAFKA-14130 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: Guozhang Wang While working on KAFKA-13877, I feel it's an overkill to introduce the whole test class as an integration test, since all we need is to just test the assignor itself which could be a unit test. Running this suite with 9+ instances takes long time and is still vulnerable to all kinds of timing based flakiness. A better choice is to reduce it as a unit test, similar to {{HighAvailabilityStreamsPartitionAssignorTest}} that just test the behavior of the assignor itself, rather than creating many instances hence depend on various timing bombs to not explode. The scope of this ticket is to refactor the {{RackAwarenessIntegrationTest}} into a {{RackAwarenessStreamsPartitionAssignorTest}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics
[ https://issues.apache.org/jira/browse/KAFKA-13846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13846. --- Fix Version/s: 3.3.0 Resolution: Fixed > Add an overloaded metricOrElseCreate function in Metrics > > > Key: KAFKA-13846 > URL: https://issues.apache.org/jira/browse/KAFKA-13846 > Project: Kafka > Issue Type: Improvement > Components: metrics >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: newbie > Fix For: 3.3.0 > > > The `Metrics` registry is often used by concurrent threads, however it's > get/create APIs are not well suited for it. A common pattern from the user > today is: > {code} > metric = metrics.metric(metricName); > if (metric == null) { > try { > metrics.createMetric(..) > } catch (IllegalArgumentException e){ > // another thread may create the metric at the mean time > } > } > {code} > Otherwise the caller would need to synchronize the whole block trying to get > the metric. However, the `createMetric` function call itself indeed > synchronize internally on updating the metric map. > So we could consider adding a metricOrElseCreate function which is similar to > createMetric, but instead of throwing an illegal argument exception within > the internal synchronization block, it would just return the already existing > metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13880) DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages
[ https://issues.apache.org/jira/browse/KAFKA-13880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13880. --- Fix Version/s: 3.3.0 Assignee: Guozhang Wang Resolution: Fixed > DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages > -- > > Key: KAFKA-13880 > URL: https://issues.apache.org/jira/browse/KAFKA-13880 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Artem Livshits >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.3.0 > > > While working on KIP-794, I noticed that DefaultStreamPartitioner does not > call .onNewBatch. The "sticky" DefaultStreamPartitioner introduced as a > result of https://issues.apache.org/jira/browse/KAFKA-8601 requires > .onNewBatch call in order to switch to a new partitions for unkeyed messages, > just calling .partition would return the same "sticky" partition chosen > during the first call to .partition. The partition doesn't change even if > the partition leader is unavailable. > Ideally, for unkeyed messages the DefaultStreamPartitioner should take > advantage of the new built-in partitioning logic introduced in > [https://github.com/apache/kafka/pull/12049.] Perhaps, it could return null > partition for unkeyed message, so that KafkaProducer could run built-in > partitioning logic. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13934) Consider consolidating TimeWindow / SessionWindow / SlidingWindow
Guozhang Wang created KAFKA-13934: - Summary: Consider consolidating TimeWindow / SessionWindow / SlidingWindow Key: KAFKA-13934 URL: https://issues.apache.org/jira/browse/KAFKA-13934 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang In Streams windowing operations we have several inherited classes from `Window`, as listed in the title of the ticket. They represent differences for: 1) Serialization of the window as part of the windowed key. 2) Window operations which is based on inclusive/exclusiveness of the window start/end. As a result, we have resulted in lots of duplicated code to handle those different windows in windowed aggregations. We can consider if it's worth serializing those window types differently (especially if we can get rid of the sequence id for time windows used for joins) and if we can just have a single class with booleans indicating inclusive/exclusiveness of the start/end, and hence as a result can largely reduce our code duplication around the serde and common window operations inside the stateful operator. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
[ https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13745. --- Resolution: Fixed > Flaky > kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone > - > > Key: KAFKA-13745 > URL: https://issues.apache.org/jira/browse/KAFKA-13745 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Blocker > Fix For: 3.3.0 > > > Example: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ > {code} > org.opentest4j.AssertionFailedError: expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40) > at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35) > at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227) > at > kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896
[ https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13800. --- Resolution: Fixed > Remove force cast of TimeWindowKStreamImpl in tests of > https://github.com/apache/kafka/pull/11896 > - > > Key: KAFKA-13800 > URL: https://issues.apache.org/jira/browse/KAFKA-13800 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Assignee: Hao Li >Priority: Major > > We can remove the cast after `emitStrategy` is added to public api -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
[ https://issues.apache.org/jira/browse/KAFKA-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13746. --- Resolution: Fixed > Flaky > kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed > > > Key: KAFKA-13746 > URL: https://issues.apache.org/jira/browse/KAFKA-13746 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Blocker > Fix For: 3.3.0 > > > Example: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ > {code} > java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1 > at > kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
Guozhang Wang created KAFKA-13877: - Summary: Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags Key: KAFKA-13877 URL: https://issues.apache.org/jira/browse/KAFKA-13877 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: Guozhang Wang The following test fails on local testbeds about once per 10-15 runs: {code} java.lang.AssertionError at org.junit.Assert.fail(Assert.java:87) at org.junit.Assert.assertTrue(Assert.java:42) at org.junit.Assert.assertTrue(Assert.java:53) at org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct
[ https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13647. --- Resolution: Incomplete I resolved the ticket for now as incomplete, since the streams code cannot alone fix the issue, since it's on the rocksDB side to fix. > RocksDb metrics 'number-open-files' is not correct > -- > > Key: KAFKA-13647 > URL: https://issues.apache.org/jira/browse/KAFKA-13647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Sylvain Le Gouellec >Priority: Major > Attachments: image-2022-02-07-16-06-25-304.png, > image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png > > > We were looking at RocksDB metrics and noticed that the {{number-open-files}} > metric behaves like a counter, rather than a gauge. > Looking at the code, we think there is a small error in the type of metric > for that specific mbean (should be a value metric rather than a sum metric). > See [ > https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics
Guozhang Wang created KAFKA-13846: - Summary: Add an overloaded metricOrElseCreate function in Metrics Key: KAFKA-13846 URL: https://issues.apache.org/jira/browse/KAFKA-13846 Project: Kafka Issue Type: Improvement Components: metrics Reporter: Guozhang Wang The `Metrics` registry is often used by concurrent threads, however it's get/create APIs are not well suited for it. A common pattern from the user today is: {code} metric = metrics.metric(metricName); if (metric == null) { try { metrics.createMetric(..) } catch (IllegalArgumentException e){ // another thread may create the metric at the mean time } } {code} Otherwise the caller would need to synchronize the whole block trying to get the metric. However, the `createMetric` function call itself indeed synchronize internally on updating the metric map. So we could consider adding a metricOrElseCreate function which is similar to createMetric, but instead of throwing an illegal argument exception within the internal synchronization block, it would just return the already existing metric. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13799) Improve documentation for Kafka zero-copy
[ https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13799. --- Fix Version/s: 3.3.0 Assignee: RivenSun Resolution: Fixed > Improve documentation for Kafka zero-copy > - > > Key: KAFKA-13799 > URL: https://issues.apache.org/jira/browse/KAFKA-13799 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > Fix For: 3.3.0 > > > Via documentation https://kafka.apache.org/documentation/#maximizingefficiency > and [https://kafka.apache.org/documentation/#networklayer] , > We can know that Kafka combines pagecache and zero-copy when reading messages > in files on disk, which greatly improves the consumption rate of messages. > But after browsing the source code: > Look directly at the *FileRecords.writeTo(...)* method, > 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), > and the bottom layer calls the sendfile method to implement zero-copy data > transfer. > 2. The logic of the SslTransportLayer.transferFrom() method: > {code:java} > fileChannel.read(fileChannelBuffer, pos) > -> > sslEngine.wrap(src, netWriteBuffer) > -> > flush(ByteBuffer buf) && socketChannel.write(buf){code} > That is, first read the data on the disk or directly from the page cache, > then encrypt the data, and finally send the encrypted data to the network. > {*}FileChannel.transferTo() is not used in the whole process{*}. > > Conclusion: > PlaintextTransportLayer and SslTransportLayer both use pagecache, but > SslTransportLayer does not implement zero-copy. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time
[ https://issues.apache.org/jira/browse/KAFKA-13692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13692. --- Fix Version/s: 3.3.0 (was: 3.2.0) Resolution: Fixed > stream thread blocked-time-ns-total metric does not include producer metadata > wait time > --- > > Key: KAFKA-13692 > URL: https://issues.apache.org/jira/browse/KAFKA-13692 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0 >Reporter: Rohan Desai >Assignee: Rohan Desai >Priority: Major > Fix For: 3.3.0 > > > The stream thread blocked-time-ns-total metric does not include producer > metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can > contribute significantly to actual total blocked time in some cases. For > example, if a user deletes the streams sink topic, producers will wait until > the max block timeout. This time does not get included in total blocked time > when it should. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
Guozhang Wang created KAFKA-13766: - Summary: Use `max.poll.interval.ms` as the timeout during complete-rebalance phase Key: KAFKA-13766 URL: https://issues.apache.org/jira/browse/KAFKA-13766 Project: Kafka Issue Type: Bug Components: core Reporter: Guozhang Wang The lifetime of a consumer can be categorized in three phases: 1) During normal processing, the broker expects a hb request periodically from consumer, and that is timed by the `session.timeout.ms`. 2) During the prepare_rebalance, the broker would expect a join-group request to be received within the rebalance.timeout, which is piggy-backed as the `max.poll.interval.ms`. 3) During the complete_rebalance, the broker would expect a sync-group request to be received again within the `session.timeout.ms`. So during different phases of the life of the consumer, different timeout would be used to bound the timer. Nowadays with cooperative rebalance protocol, we can still return records and process them in the middle of a rebalance from {{consumer.poll}}. In that case, for phase 3) we should also use the `max.poll.interval.ms` to bound the timer, which is in practice larger than `session.timeout.ms`. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13765) Describe-consumer admin should not return unstable membership information
Guozhang Wang created KAFKA-13765: - Summary: Describe-consumer admin should not return unstable membership information Key: KAFKA-13765 URL: https://issues.apache.org/jira/browse/KAFKA-13765 Project: Kafka Issue Type: Bug Components: admin Reporter: Guozhang Wang When a consumer group is in the “prepare-rebalance” phase, it's unclear if all its currently registered members would still be re-joining in the new generation or not, in this case, if we simply return the current members map to the describe-consumer request it may be misleading as users would be getting spurious results that may contain those dropping or even zombie consumers. So I think during the prepare-rebalance phase, we should either only return members who's join-group requests have already been received, OR we simply return the response with no members and indicate that via prepare-rebalance state the membership info is unstable and hence won't be returned. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13728) PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.
[ https://issues.apache.org/jira/browse/KAFKA-13728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13728. --- Fix Version/s: 3.2.0 Resolution: Fixed > PushHttpMetricsReporter no longer pushes metrics when network failure is > recovered. > --- > > Key: KAFKA-13728 > URL: https://issues.apache.org/jira/browse/KAFKA-13728 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.1.0 >Reporter: XiaoyiPeng >Priority: Minor > Fix For: 3.2.0 > > > The class *PushHttpMetricsReporter* no longer pushes metrics when network > failure is recovered. > I debugged the code and found the problem here : > [https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221] > > When we submit a task to the *ScheduledThreadPoolExecutor* that needs to be > executed periodically, if the task throws an exception and is not swallowed, > the task will no longer be scheduled to execute. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
Guozhang Wang created KAFKA-13746: - Summary: Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed Key: KAFKA-13746 URL: https://issues.apache.org/jira/browse/KAFKA-13746 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 3.2.0 Example: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ {code} java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1 at kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
Guozhang Wang created KAFKA-13745: - Summary: Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone Key: KAFKA-13745 URL: https://issues.apache.org/jira/browse/KAFKA-13745 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 3.2.0 Example: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/ {code} org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40) at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35) at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227) at kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
Guozhang Wang created KAFKA-13737: - Summary: Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection Key: KAFKA-13737 URL: https://issues.apache.org/jira/browse/KAFKA-13737 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Examples: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812) at scala.util.Try$.apply(Try.scala:210) at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811) at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819) at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789) at kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
[ https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reopened KAFKA-13736: --- > Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives > --- > > Key: KAFKA-13736 > URL: https://issues.apache.org/jira/browse/KAFKA-13736 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Priority: Major > > Examples: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests > {code} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483) > at > kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
[ https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13736. --- Resolution: Duplicate > Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives > --- > > Key: KAFKA-13736 > URL: https://issues.apache.org/jira/browse/KAFKA-13736 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Priority: Major > > Examples: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests > {code} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483) > at > kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
Guozhang Wang created KAFKA-13736: - Summary: Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives Key: KAFKA-13736 URL: https://issues.apache.org/jira/browse/KAFKA-13736 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Examples: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests {code} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483) at kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
Guozhang Wang created KAFKA-13735: - Summary: Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives Key: KAFKA-13735 URL: https://issues.apache.org/jira/browse/KAFKA-13735 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Examples: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests {code} Stacktrace java.lang.IllegalStateException: Channel closed too early at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511) at scala.Option.getOrElse(Option.scala:201) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482) at kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Reopened] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reopened KAFKA-13421: --- Re-opening this ticket since the test is still failing. > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Jason Gustafson >Priority: Major > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead
Guozhang Wang created KAFKA-13722: - Summary: Update internal interfaces that use ProcessorContext to use StateStoreContext instead Key: KAFKA-13722 URL: https://issues.apache.org/jira/browse/KAFKA-13722 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang This is a remainder that when we remove the deprecated public APIs that uses the ProcessorContext, like `StateStore.init`, we should also consider updating the internal interfaces with the ProcessorContext as well. That includes: 1. Segments and related util classes which use ProcessorContext. 2. For state stores that leverage on ProcessorContext.getXXXTime, their logic should be moved out of the state store impl but to the processor node level that calls on these state stores. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-12256) auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
[ https://issues.apache.org/jira/browse/KAFKA-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12256. --- Fix Version/s: 3.2.0 Resolution: Fixed > auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION > - > > Key: KAFKA-12256 > URL: https://issues.apache.org/jira/browse/KAFKA-12256 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.0.0 >Reporter: Ryan Leslie >Priority: Minor > Labels: new-consumer-threading-should-fix > Fix For: 3.2.0 > > > In KAFKA-6829 a change was made to the consumer to internally retry commits > upon receiving UNKNOWN_TOPIC_OR_PARTITION. > Though this helped mitigate issues around stale broker metadata, there were > some valid concerns around the negative effects for routine topic deletion: > https://github.com/apache/kafka/pull/4948 > In particular, if a commit is issued for a deleted topic, retries can block > the consumer for up to max.poll.interval.ms. This is tunable of course, but > any amount of stalling in a consumer can lead to unnecessary lag. > One of the assumptions while permitting the change was that in practice it > should be rare for commits to occur for deleted topics, since that would > imply messages were being read or published at the time of deletion. It's > fair to expect users to not delete topics that are actively published to. But > this assumption is false in cases where auto commit is enabled. > With the current implementation of auto commit, the consumer will regularly > issue commits for all topics being fetched from, regardless of whether or not > messages were actually received. The fetch positions are simply flushed, even > when they are 0. This is simple and generally efficient, though it does mean > commits are often redundant. Besides the auto commit interval, commits are > also issued at the time of rebalance, which is often precisely at the time > topics are deleted. > This means that in practice commits for deleted topics are not really rare. > This is particularly an issue when the consumer is subscribed to a multitude > of topics using a wildcard. For example, a consumer might subscribe to a > particular "flavor" of topic with the aim of auditing all such data, and > these topics might dynamically come and go. The consumer's metadata and > rebalance mechanisms are meant to handle this gracefully, but the end result > is that such groups are often blocked in a commit for several seconds or > minutes (the default is 5 minutes) whenever a delete occurs. This can > sometimes result in significant lag. > Besides having users abandon auto commit in the face of topic deletes, there > are probably multiple ways to deal with this, including reconsidering if > commits still truly need to be retried here, or if this behavior should be > more configurable; e.g. having a separate commit timeout or policy. In some > cases the loss of a commit and subsequent message duplication is still > preferred to processing delays. And having an artificially low > max.poll.interval.ms or rebalance.timeout.ms comes with its own set of > concerns. > In the very least the current behavior and pitfalls around delete with active > consumers should be documented. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffic
[ https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13310. --- Fix Version/s: 3.2.0 Resolution: Fixed > KafkaConsumer cannot jump out of the poll method, and the consumer is blocked > in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). > Cpu and traffic of Broker‘s side increase sharply > --- > > Key: KAFKA-13310 > URL: https://issues.apache.org/jira/browse/KAFKA-13310 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.8.1 > Environment: prod >Reporter: RivenSun >Assignee: RivenSun >Priority: Major > Fix For: 3.2.0 > > Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, > ThirdDebugLog1.png, ThirdDebugLog2.png, brokerCpu.png, brokerNetBytes.png, > kafkaConsumerLog.png > > > h2. Foreword > Because our consumers' consumption logic is sometimes heavier, we refer > to the configuration of Kafka stream > [https://kafka.apache.org/documentation/#upgrade_10201_notable] > Set max.poll.interval.ms to Integer.MAX_VALUE > Our consumers have adopted method : > consumer.subscribe(Pattern.compile(".*riven.*")); > > h2. Recurrence of the problem scene > operate steps are > (1) Test environment Kafka cluster: three brokers > (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, > and rivenTest88 > (3) Only one consumer is needed, group.id is "rivenReassign", > consumer.subscribe(Pattern.compile(".*riven.*")); > (4) At the beginning, the group status is stable, and everything is normal > for consumers, then I delete topic: rivenTest88 > > h2. Phenomenon > Problem phenomenon > (1) The consumer is blocked in the poll method, no longer consume any > messages, and the consumer log is always printing > [main] WARN > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer > clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit > failed on partition rivenTest88-1 at offset 0: This server does not host this > topic-partition. > (2) The describe consumerGroup interface of Adminclient has always timed > out, and the group status is no longer stable > (3) The cpu and traffic of the broker are *significantly increased* > > > h2. Problem tracking > By analyzing the kafkaConsumer code, the version is 2.8.1. > I found that you introduced the waitForJoinGroup variable in the > updateAssignmentMetadataIfNeeded method. For the reason, I attached the > comment on the method: "try to update assignment metadata BUT do not need to > block on the timer for join group". See as below: > > {code:java} > if (includeMetadataInTimeout) { > // try to update assignment metadata BUT do not need to block on the > timer for join group > updateAssignmentMetadataIfNeeded(timer, false); > } else { > while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), > true)) { > log.warn("Still waiting for metadata"); > } > }{code} > > > By tracing the code back layer by layer, it is found that the function of > this variable is to construct a time.timer(0L) and pass it back to the method > joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below: > {code:java} > // if not wait for join group, we would just use a timer of 0 > if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) { > // since we may use a different timer in the callee, we'd still need > // to update the original timer's current time after the call > timer.update(time.milliseconds()); > return false; > } > {code} > But you will find that there is a submethod onJoinPrepare in the method > stack of joinGroupIfNeeded, and then there is a line of code in the > onJoinPrepare method > maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), > the value of rebalanceConfig.rebalanceTimeoutMs is actually > max.poll.interval.ms. > Finally, I tracked down ConsumerCoordinator's method > commitOffsetsSync(Map offsets, Timer timer) > The input parameter offsets is subscriptions.allConsumed(), when I delete > the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite > loop! !* > {code:java} > public boolean commitOffsetsSync(Map > offsets, Timer timer) { > invokeCompletedOffsetCommitCallbacks(); > if (offsets.isEmpty()) > return true; > do { > if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { > return false; > } > RequestFuture future = sendOffsetCommitReques
[jira] [Resolved] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
[ https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13563. --- Fix Version/s: 3.2.0 3.1.1 Resolution: Fixed > FindCoordinatorFuture never get cleared in non-group mode( consumer#assign) > --- > > Key: KAFKA-13563 > URL: https://issues.apache.org/jira/browse/KAFKA-13563 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.1, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.2.0, 3.1.1 > > Attachments: kafka.zip > > > In KAFKA-10793, we fix the race condition when lookup coordinator by clearing > the _findCoordinatorFuture_ when handling the result, rather than in the > listener callbacks. It works well under consumer group mode (i.e. > Consumer#subscribe), but we found when user is using non consumer group mode > (i.e. Consumer#assign) with group id provided (for offset commitment, so that > there will be consumerCoordinator created), the _findCoordinatorFuture_ will > never be cleared in some situations, and cause the offset committing keeps > getting NOT_COORDINATOR error. > > After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: > # heartbeat thread > # AbstractCoordinator#ensureCoordinatorReady > But in non consumer group mode with group id provided, there will be no > (1)heartbeat thread , and it only call > (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to > fetch committed offset position. That is, after 2nd lookupCoordinator call, > we have no chance to clear the _findCoordinatorFuture_ . > > To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear > the _findCoordinatorFuture_ in the future listener. So, I think we can fix > this issue by calling AbstractCoordinator#ensureCoordinatorReady when > coordinator unknown in non consumer group case, under each Consumer#poll. > > Reproduce steps: > > 1. Start a 3 Broker cluster with a Topic having Replicas=3. > 2. Start a Client with Producer and Consumer (with Consumer#assign(), not > subscribe, and provide a group id) communicating over the Topic. > 3. Stop the Broker that is acting as the Group Coordinator. > 4. Observe successful Rediscovery of new Group Coordinator. > 5. Restart the stopped Broker. > 6. Stop the Broker that became the new Group Coordinator at step 4. > 7. Observe "Rediscovery will be attempted" message but no "Discovered group > coordinator" message. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception
[ https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13346. --- Resolution: Not A Problem > Kafka Streams fails due to RocksDB Locks Not Available Exception > > > Key: KAFKA-13346 > URL: https://issues.apache.org/jira/browse/KAFKA-13346 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Amit Gupta >Priority: Major > > Hello, > We are using Kafka Streams and we observe that some times on some of the > hosts running streams application, Kafka streams instance fails with > unexpected exception. We are running with 40 stream threads per host and 20 > hosts in total. > Can some one please help on what can be the root cause here? > > |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store > state-store at location . > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > ~[kafka-streams-2.6.0.jar:?] > Caused by: org.rocksdb.RocksDBException: lock : > ./0_468/rocksdb/state-store/LOCK: No locks available > at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?] > at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211) > ~[kafka-streams-2.6.0.jar:?] > ... 15 more > > Some times I also see this exception > | > |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store > state-store at location ./0_433/rocksdb/state-store > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101) > ~[kafka-streams-2.6.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >
[jira] [Created] (KAFKA-13561) Consider deprecating `StreamsBuilder#build(props)` function
Guozhang Wang created KAFKA-13561: - Summary: Consider deprecating `StreamsBuilder#build(props)` function Key: KAFKA-13561 URL: https://issues.apache.org/jira/browse/KAFKA-13561 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang With https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store being accepted that introduced the new `StreamsBuilder(TopologyConfig)` constructor, we can consider deprecating the `StreamsBuilder#build(props)` function now. There are still a few things we'd need to do: 1. Copy the `StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG` to TopologyConfig. 2. Make sure the overloaded `StreamsBuilder()` constructor takes in default values of TopologyConfig. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13319) Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty
[ https://issues.apache.org/jira/browse/KAFKA-13319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13319. --- Fix Version/s: 3.1.0 Assignee: Guozhang Wang (was: Ryan) Resolution: Fixed > Do not send AddOffsetsToTxn/TxnOffsetCommit if offsets map is empty > --- > > Key: KAFKA-13319 > URL: https://issues.apache.org/jira/browse/KAFKA-13319 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Guozhang Wang >Priority: Major > Labels: newbie > Fix For: 3.1.0 > > > If a user calls `Producer.sendOffsetsToTransaction` with an empty map of > offsets, we can shortcut return and skip the logic to add the offsets topic > to the transaction. The main benefit is avoiding the unnecessary accumulation > of markers in __consumer_offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13371) Consider consolidating Joined / StreamJoined / TableJoined
Guozhang Wang created KAFKA-13371: - Summary: Consider consolidating Joined / StreamJoined / TableJoined Key: KAFKA-13371 URL: https://issues.apache.org/jira/browse/KAFKA-13371 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang This is an idea while reviewing KAFKA-13261 (adding TabledJoined). We have now three control objects: Joined, StreamJoined, TableJoined. All of them extends NamedOperations and hence has the `name` field inherited which would be used for the processor node's name and potentially store names. In addition to that * Joined: used in stream-table joins. Contains key and two value serdes used for serializing the bytes for repartitioning (however since today we only repartition one side if needed, the other value serde is never used). * StreamJoined: used in stream-stream joins. It includes the serdes, AND also the store suppliers and other control variables on the store names. * TableJoined: used in table-table foreign key joins. It does not include any serdes but includes the partitioner information. The main difference between these different constructs are: * KTables themselves have embedded a materialized mechanism via `valueGetterSupplier` whenever they are created, either from source, or from aggregate / join operators, so they do not need extra materialization indicators when participated in a follow-up join --- i.e. they either are already materialized from the operators that generate them, or they will "grandfather" back to the upstream KTable on the fly with a logical view when that view is being fetched via the `ValueGetterSupplier`. On the other hand, KStreams do not have materialization mechanism inherently and hence operators that do need to materialize the streams then need to provide such methods. * Table-table foreign-key join has a special needs for partitioners. [~vvcephei] has a good proposal for https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and as part of that proposal we could consider adding partitioner for source streams / tables and inherit throughout the topology pipeline. Following that idea, we can consider consolidating the above "Joined" objects by isolating the materialization / partitioner variables. More specifically, here's a concrete proposal: 1) `StreamsBuilder.table/stream` would pass in an optional partitioner. 2) And similarly all operators that changes the key would allow an optional partitioner: 2.a) `KStream.repartition/groupBy` and `KTable.groupBy` would allow an optional partitioner in `Repartitioned`, as piggy-backed we would also deprecate `Grouped` with `Repartitioned` since the latter would subsume the former. 2.b) `KStream.map/flatMap/selectKey` stays as is, and similar to serdes, these operators would stop the inheritance of partitioners of the upstream entities. 3) `Repartition` would also add the key/value serdes used for serializing for the repartition topics. 4) `KStream.join(KTable)` and `KStream.join(KStream)` would pass in an optional `Repartitioned` in addition to `Joined` which can be used to encode the partitioner info. 5) Foreign-key `KTable.join(KTable)` would pass in an optional `Repartitioned` which can be used to encode the partitioner info. 7) As a result of all above points, we can then reduce `StreamJoined` / `TableJoined` / `Joined` since all their enwrapped control objects are not separated in `Repartitioned` and `Materialized`: note that for `StreamJoined`, the store suppliers / names / configs would now be wrapped in two Materialized objects which would still not be exposed for IQ. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13268. --- Resolution: Duplicate > Add more integration tests for Table Table FK joins with repartitioning > --- > > Key: KAFKA-13268 > URL: https://issues.apache.org/jira/browse/KAFKA-13268 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Victoria Xia >Priority: Major > > We should add to the FK join multipartition integration test with a > Repartitioned for: > 1) just the new partition count > 2) a custom partitioner > This is to test if there's a bug where the internal topics don't pick up a > partitioner provided that way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13356) Use "delete" retention policy only for stream-stream join windowed stores
Guozhang Wang created KAFKA-13356: - Summary: Use "delete" retention policy only for stream-stream join windowed stores Key: KAFKA-13356 URL: https://issues.apache.org/jira/browse/KAFKA-13356 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today stream-stream join associated window stores, like any other window stores, use "delete,compact" as their retention policies. However, since today we add sequence number to disable de-duplication of keys, "compaction" would never be able to compact any keys, but only result in 1) CPU waste on the cleaner thread on brokers, 2) some additional feature of brokers that relies on "delete" policy to not be able to apply. Until we change the store format potentially in the future to not use sequence number for disable de-duping, we could consider just changing the policy to "delete" for stream-stream join's window store for now. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator
Guozhang Wang created KAFKA-13349: - Summary: Allow Iterator.remove on KeyValueIterator Key: KAFKA-13349 URL: https://issues.apache.org/jira/browse/KAFKA-13349 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today Stream's state store's range iterator does not support `remove`. We could consider adding such support for all the built-in state stores: * RocksDB's native iterator does not support removal, but we can always do a delete(key) concurrently while the iterator is open on the snapshot. * In-Memory: straight forward implementation. The benefit of that is then for range-and-delete truncation operation we do not necessarily have to be cautious about concurrent modification exceptions. This could also help GC with in-memory stores. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.
[ https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13301. --- Fix Version/s: 3.1.0 Resolution: Fixed > The relationship between request.timeout. ms and max.poll.interval.ms in the > Consumer Configs is incorrect. > --- > > Key: KAFKA-13301 > URL: https://issues.apache.org/jira/browse/KAFKA-13301 > Project: Kafka > Issue Type: Improvement >Reporter: yangshengwei >Priority: Trivial > Fix For: 3.1.0 > > Attachments: image-2021-09-15-15-37-25-561.png, > image-2021-09-15-15-39-00-179.png > > > in Consumer Configs,The value of the configuration max.poll.interval.ms > always be larger than request.timeout.ms must . But here's what the official > document says: The value of the configuration request.timeout.ms must always > be larger than max.poll.interval.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13216. --- Resolution: Fixed > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sergio Peña >Assignee: Guozhang Wang >Priority: Critical > Fix For: 3.1.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13249) Checkpoints do not contain latest offsets on shutdown when using EOS
[ https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13249. --- Fix Version/s: 3.1.0 Resolution: Fixed > Checkpoints do not contain latest offsets on shutdown when using EOS > > > Key: KAFKA-13249 > URL: https://issues.apache.org/jira/browse/KAFKA-13249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.7.0, 2.8.0 >Reporter: Oliver Hutchison >Assignee: Oliver Hutchison >Priority: Major > Fix For: 3.1.0 > > > When using EOS the {{.checkpoint}} file created when a stateful streams app > is shutdown does not always contain changelog offsets which represent the > latest state of the state store. The offsets can often be behind the end of > the changelog - sometimes quite significantly. > This leads to a state restore being required when the streams app restarts > after shutting down cleanly as streams thinks (based on the incorrect offsets > in the checkpoint) that the state store is not up to date with the changelog. > This is increasing the time we see it takes to do a clean restart of a single > instance streams app from around 10 second to sometime over 2 minutes in our > case. > I suspect the bug appears because an assumption about the {{commitNeeded}} > field in the following method in {{StreamTask}}: > {code:java} > protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { > // commitNeeded indicates we may have processed some records since last > commit > // and hence we need to refresh checkpointable offsets regardless whether > we should checkpoint or not > if (commitNeeded) { > stateMgr.updateChangelogOffsets(checkpointableOffsets()); > } > super.maybeWriteCheckpoint(enforceCheckpoint); > } > {code} > In a steady state case for a simple single instance single thread stream app > where an app simply starts, runs and then shuts down the {{if > (commitNeeded)}} test always fails when running with EOS which results in the > latest checkpoint offsets never getting updated into the {{stateMgr}}. > Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this > is the case as there's only 1 place in the code which calls > {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final > boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}} > state. > {code:java} > case RUNNING: > if (enforceCheckpoint || !eosEnabled) { > maybeWriteCheckpoint(enforceCheckpoint); > } > log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", > state(), eosEnabled, enforceCheckpoint); > break; > {code} > We can see from this code that {{maybeWriteCheckpoint}} will only ever to > called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as > we're running with EOS. > So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}? > Again looking only at the steady state case we find that it's only called > from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from > {{TaskManager.shutdown}}. > The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it > happens *after* all active tasks have commited. Which means that > {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test > back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the > latest offsets stored into the state manager. > I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to > be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always > update the changelog offserts before we write the checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13286) Revisit Streams State Store and Serde Implementation
Guozhang Wang created KAFKA-13286: - Summary: Revisit Streams State Store and Serde Implementation Key: KAFKA-13286 URL: https://issues.apache.org/jira/browse/KAFKA-13286 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design: * The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially when working with windowed state stores that rely on composite keys. In many places in the code we have extract parts of the composite key to deserialize the either the timestamp or the message key from the state store key (e.g. the methods in WindowStoreUtils). * The serde operation could happen on multiple layers of the state store hierarchies, which means we need to extra byte array copies as we move along doing serdes. For example, we do serde in the metered layer, but then again in cached layer with cache functions, and also in logged stores for generated the key/value in bytes to send to Kafka. To improve on this, we can consider having support for serde into/from ByteBuffers would allow us to reuse the underlying bytearrays and just pass around slices of the underlying Buffers to avoid the unnecessary copying. 1) More specifically, e.g. the serialize interface could be refactored to: {code} ByteBuffer serialize(String topic, T data, ByteBuffer); {code} Where the serialized bytes would be appended to the ByteBuffer. When a series of serialize functions are called along side the state store hierarchies, we then just need to make sure that what's should be appended first to the ByteBuffer would be serialized first. E.g. if the serialized bytes format of a WindowSchema is Then we would need to call the serialize as in: {code} serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); {code} 2) In addition, we can consider having a pool of ByteBuffers representing a set of byte arrays that can be re-used. This can be captured as an intelligent {{ByteBufferSupplier}}, which provides: {code} ByteBuffer ByteBufferSupplier#allocate(long size) {code} Its implementation can choose to either create new byte arrays, or re-use existing ones in the pool; the gottcha though is that we may usually not know the serialized byte length for raw keys (think: in practice the keys would be in json/avro etc), and hence would not know how to pass in {{size}} for serialization, and hence may need to be conservative, or trial and error etc. Of course callers then would be responsible for returning the used ByteBuffer back to the Supplier via {code} ByteBufferSupplier#deallocate(ByteBuffer buffer) {code} 3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also allocate them from RocksDB directly so that using them for puts/gets would not go through JNI, hence is more efficient. The Supplier then would need to be careful to deallocate these direct byte-buffers since they would not be GC'ed by the JVM. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning
Guozhang Wang created KAFKA-13268: - Summary: Add more integration tests for Table Table FK joins with repartitioning Key: KAFKA-13268 URL: https://issues.apache.org/jira/browse/KAFKA-13268 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: Guozhang Wang We should add to the FK join multipartition integration test with a Repartitioned for: 1) just the new partition count 2) a custom partitioner This is to test if there's a bug where the internal topics don't pick up a partitioner provided that way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version
[ https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13257. --- Resolution: Not A Problem > KafkaStreams Support For Latest RocksDB Version > --- > > Key: KAFKA-13257 > URL: https://issues.apache.org/jira/browse/KAFKA-13257 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Alagukannan >Priority: Major > Attachments: hs_err_pid6.log > > > Hi, > Can you please let us know if there is any plan for adding the latest > versions of rocksDB in kafka streams. If your planning it what's the timeline > we are looking at. If not planning to upgrade what's the reason behind it. Is > there any significant impact on upgrading like backward combability etc.. > Just to remind this general query to know about the rocksdb upgrade and its > impact on streams application. > The main pain point behind asking this upgrade is, We tried to build an > application with kafka streams 2.8.0 on an alpine based OS and the docker > base image is as follows > azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless. The streams > application worked fine until it had an interaction with state > store(rocksdb). The jvm crashed with the following error: > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207 > # > # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) > (build 11.0.10+9-LTS) > # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed > mode, tiered, compressed oops, g1 gc, linux-amd64) > # Problematic frame: > # C [librocksdbjni15322693993163550519.so+0x271b27] > std::_Rb_tree, > std::less, std::allocator > >::_M_erase(std::_Rb_tree_node*)+0x27 > Then we found out rocksdb works well on glibc and not musl lib, where as > alpine supports musl lib alone for native dependencies. Further looking into > rocksdb for a solution we found that they have started supporting both glib > and musl native libs from 6.5.x versions. > But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is > the main reason behind asking for the rocksDB upgrade in kafka streams as > well. > Have attached the PID log where JVM failures are happening. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13175) The topic is marked for deletion, create topic with the same name throw exception topic already exists.
[ https://issues.apache.org/jira/browse/KAFKA-13175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13175. --- Fix Version/s: 3.1.0 Resolution: Fixed > The topic is marked for deletion, create topic with the same name throw > exception topic already exists. > --- > > Key: KAFKA-13175 > URL: https://issues.apache.org/jira/browse/KAFKA-13175 > Project: Kafka > Issue Type: Bug >Reporter: yangshengwei >Priority: Major > Fix For: 3.1.0 > > Attachments: kafka (2).jpg, zookeeper.jpg > > > After a topic is deleted, the topic is marked for deletion, create topic with > the same name throw exception topic already exists. It should throw exception > the topic is marked for deletion. I can choose to wait for the topic to be > completely deleted. If the topic is still not deleted for a long time, we > need to check the reason why it is not deleted. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13243) Differentiate metric latency measured in millis and nanos
Guozhang Wang created KAFKA-13243: - Summary: Differentiate metric latency measured in millis and nanos Key: KAFKA-13243 URL: https://issues.apache.org/jira/browse/KAFKA-13243 Project: Kafka Issue Type: Improvement Components: metrics Reporter: Guozhang Wang Today most of the client latency metrics are measured in millis, and some in nanos. For those measured in nanos we usually differentiate them by having a `-ns` suffix in the metric names, e.g. `io-wait-time-ns-avg` and `io-time-ns-avg`. But there are a few that we obviously forgot to follow this pattern, e.g. `io-wait-time-total`: it is inconsistent where `avg` has `-ns` suffix and `total` has not. I did a quick search and found just two of them: * bufferpool-wait-time-total : bufferpool-wait-time-ns-total * io-wait-time-total: io-wait-time-ns-total We should change their name accordingly with the `-ns` suffix as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13239) Use RocksDB.ingestExternalFile for restoration
Guozhang Wang created KAFKA-13239: - Summary: Use RocksDB.ingestExternalFile for restoration Key: KAFKA-13239 URL: https://issues.apache.org/jira/browse/KAFKA-13239 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Now that we are in newer version of RocksDB, we can consider using the new {code} ingestExternalFile(final ColumnFamilyHandle columnFamilyHandle, final List filePathList, final IngestExternalFileOptions ingestExternalFileOptions) {code} for restoring changelog into state stores. More specifically: 1) Use larger default batch size in restore consumer polling behavior so that each poll would return more records as possible. 2) For a single batch of records returned from a restore consumer poll call, first write them as a single SST File using the {{SstFileWriter}}. The existing {{DBOptions}} could be used to construct the {{EnvOptions} and {{Options}} for the writter. Do not yet ingest the written file to the db yet within each iteration 3) At the end of the restoration, call {{RocksDB.ingestExternalFile}} given all the written files' path as the parameter. The {{IngestExternalFileOptions}} would be specifically configured to allow key range overlapping with mem-table. 4) A specific note is that after the call in 3), heavy compaction may be executed by RocksDB in the background and before it cools down, starting normal processing immediately which would try to {{put}} new records into the store may see high stalls. To work around it we would consider using {{RocksDB.compactRange()}} which would block until the compaction is completed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13170) Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown
[ https://issues.apache.org/jira/browse/KAFKA-13170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13170. --- Fix Version/s: 3.1.0 Resolution: Fixed > Flaky Test InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown > -- > > Key: KAFKA-13170 > URL: https://issues.apache.org/jira/browse/KAFKA-13170 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.1.0 > > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11176/2/testReport/org.apache.kafka.streams.processor.internals/InternalTopicManagerTest/Build___JDK_8_and_Scala_2_12___shouldRetryDeleteTopicWhenTopicUnknown_2/] > {code:java} > Stacktracejava.lang.AssertionError: unexpected exception type thrown; > expected: but > was: > at org.junit.Assert.assertThrows(Assert.java:1020) > at org.junit.Assert.assertThrows(Assert.java:981) > at > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenRetriableException(InternalTopicManagerTest.java:526) > at > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.shouldRetryDeleteTopicWhenTopicUnknown(InternalTopicManagerTest.java:497) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13172) Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported
Guozhang Wang created KAFKA-13172: - Summary: Document in Streams 3.0 that due to rocksDB footer version in-filght downgrade is not supported Key: KAFKA-13172 URL: https://issues.apache.org/jira/browse/KAFKA-13172 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8683) Flakey test InternalTopicManagerTest @shouldNotCreateTopicIfExistsWithDifferentPartitions
[ https://issues.apache.org/jira/browse/KAFKA-8683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8683. -- Resolution: Cannot Reproduce Have not seen this flakiness recently and also cannot reproduce locally after 10,000 runs, closing for now. > Flakey test InternalTopicManagerTest > @shouldNotCreateTopicIfExistsWithDifferentPartitions > -- > > Key: KAFKA-8683 > URL: https://issues.apache.org/jira/browse/KAFKA-8683 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/414/consoleFull] > org.apache.kafka.streams.processor.internals.InternalTopicManagerTest > > shouldNotCreateTopicIfExistsWithDifferentPartitions PASSED*00:05:46* ERROR: > Failed to write output for test null.Gradle Test Executor 5*00:05:46* > java.lang.NullPointerException: Cannot invoke method write() on null > object*00:05:46* at > org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:91)*00:05:46* > at > org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:47)*00:05:46* > at > org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*00:05:46* > at > org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:34)*00:05:46* >at > org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*00:05:46* > at java_io_FileOutputStream$write.call(Unknown Source)*00:05:46* > at build_9s5vsq3vnws1928hdaummvzb1$_r -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
Guozhang Wang created KAFKA-13152: - Summary: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" Key: KAFKA-13152 URL: https://issues.apache.org/jira/browse/KAFKA-13152 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang The current config "buffered.records.per.partition" controls how many records in maximum to bookkeep, and hence it is exceed we would pause fetching from this partition. However this config has two issues: * It's a per-partition config, so the total memory consumed is dependent on the dynamic number of partitions assigned. * Record size could vary from case to case. And hence it's hard to bound the memory usage for this buffering. We should consider deprecating that config with a global, e.g. "input.buffer.max.bytes" which controls how much bytes in total is allowed to be buffered. This is doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9858) CVE-2016-3189 Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 allows remote attackers to cause a denial of service (crash) via a crafted bzip2 file, related
[ https://issues.apache.org/jira/browse/KAFKA-9858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9858. -- Fix Version/s: 3.0.0 Resolution: Fixed > CVE-2016-3189 Use-after-free vulnerability in bzip2recover in bzip2 1.0.6 > allows remote attackers to cause a denial of service (crash) via a crafted > bzip2 file, related to block ends set to before the start of the block. > - > > Key: KAFKA-9858 > URL: https://issues.apache.org/jira/browse/KAFKA-9858 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.2.2, 2.3.1, 2.4.1 >Reporter: sihuanx >Priority: Major > Fix For: 3.0.0 > > > I'm not sure whether CVE-2016-3189 affects kafka 2.4.1 or not? This > vulnerability was related to rocksdbjni-5.18.3.jar which is compiled with > *bzip2 .* > Is there any task or plan to fix it? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13008. --- Assignee: Guozhang Wang Resolution: Fixed > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Luke Chen >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12957) Refactor Streams Logical Plan Generation
Guozhang Wang created KAFKA-12957: - Summary: Refactor Streams Logical Plan Generation Key: KAFKA-12957 URL: https://issues.apache.org/jira/browse/KAFKA-12957 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang There is a general issue of Streams logical plan -> physical plan generation, where the physical processor nodes are generated at the parsing phase rather than the logical plan compilation phase. The former stage is agnostic to any user configurations while only the latter stage have access to it, and hence we would not generate physical processor nodes during the parsing phase (i.e. any code related to StreamsBuilder), but defer them to the logical plan phase (i.e. XXNode.writeToTopology). This has several issues such that many physical processor instantiation requires to access the configs, and hence we have to defer it to the `init` procedure of the node, which is scattered in many places from logical nodes to physical processors. This would be a big refactoring on Stream's logical plan generation, but I think it would worth to get this in a cleaner state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-10585. --- Fix Version/s: 3.0.0 Resolution: Fixed > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Rohan Desai >Assignee: Dongjin Lee >Priority: Minor > Labels: newbie++ > Fix For: 3.0.0 > > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12920) Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost`
Guozhang Wang created KAFKA-12920: - Summary: Consumer's cooperative sticky assignor need to clear generation / assignment data upon `onPartitionsLost` Key: KAFKA-12920 URL: https://issues.apache.org/jira/browse/KAFKA-12920 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Consumer's cooperative-sticky assignor does not track the owned partitions inside the assignor --- i.e. when it reset its state in event of ``onPartitionsLost``, the ``memberAssignment`` and ``generation`` inside the assignor would not be cleared. This would cause a member to join with empty generation on the protocol while with non-empty user-data encoding the old assignment still (and hence pass the validation check on broker side during JoinGroup), and eventually cause a single partition to be assigned to multiple consumers within a generation. We should let the assignor to also clear its assignment/generation when ``onPartitionsLost`` is triggered in order to avoid this scenario. Note that 1) for the regular sticky assignor the generation would still have an older value, and this would cause the previously owned partitions to be discarded during the assignment, and 2) for Streams' sticky assignor, it’s encoding would indeed be cleared along with ``onPartitionsLost``. Hence only Consumer's cooperative-sticky assignor have this issue to solve. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12918) Watch-Free Animated 'America: The Motion Picture' Trailer 2021 Full Hd Download
[ https://issues.apache.org/jira/browse/KAFKA-12918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12918. --- > Watch-Free Animated 'America: The Motion Picture' Trailer 2021 Full Hd > Download > > > Key: KAFKA-12918 > URL: https://issues.apache.org/jira/browse/KAFKA-12918 > Project: Kafka > Issue Type: Bug >Reporter: mushfiqur rahoman >Priority: Major > > America’ Trailer: Channing Tatum Voices Foul-Mouthed George Washington in > Netflix Comedy Channing Tatum Leads a Revolution in America: The Motion > Picture Trailer Channing Tatum, Simon Pegg, Killer Mike Star in First Trailer > for America: The Motion Picture: Watch Online Full HD Free > ### > Watch Here ▶️▶️ [https://streamsable.com/movies/] > Download Here ▶️▶️ [https://streamsable.com/movies/] > ### > To help usher in a (mostly Covid-free?) 4th of July weekend, Netflix has > something special lined up: an animated film that’s an R-Rated take on the > American Revolution. “America: The Motion Picture” offers a radically > different take on the familiar history of America’s inception as a country. > George Washington and other founding fathers rally the colonial troops to > victory against the British but in a totally wild and anachronistic fashion. > Here’s the official synopsis: > READ MORE: Netflix Unveils Massive Summer Slate Teaser Trailer: New Films > With Felicity Jones, Jason Mamoa, Shailene Woodley, Kevin Hart, Liam Neeson & > More > In this wildly tongue-in-cheek animated revisionist history, a > chainsaw-wielding George Washington assembles a team of rabble-rousers — > including beer-loving bro Sam Adams, famed scientist Thomas Edison, acclaimed > horseman Paul Revere, and a very pissed off Geronimo — to defeat Benedict > Arnold and King James in the American Revolution. Who will win? No one knows, > but you can be sure of one thing: these are not your father’s Founding… uh, > Fathers. > Channing Tatum leads the voice cast as George Washington. Alongside him is > Simon Pegg as King James, Bobby Moynihan as Paul Revere, Raoul Trujillo as > Geronimo, and Jason Mantzoukas as Sam Adams. Judy Greer is also on board as > Martha Dandridge, as is Olivia Munn as Thomas Edison (yes, you read that > right). Will Forte, Andy Samberg, rapper Killer Mike, and Amber Nash are also > part of the cast. > READ MORE: The 100 Most Anticipated Films of 2021 > Matt Thompson, one of the executive producers of the cult animated show > “Archer,” directs David Callaham‘s (‘Wonder Woman;’ ‘Shang-Chi And The Legend > Of The Ten Rings‘) screenplay. Thompson and Callaham also serve as producers > with Adam Reed, also on the “Archer” team. Tatum also has a producer’s credit > with Peter Kiernan and Reed Carolin under his Free Association company. Phil > Lord and Christopher Miller, the dream team behind “The Lego Movie,” also > serve as producers with Will Allegra through Lord Miller. > READ MORE: Channing Tatum Reuniting With Lord And Miller For Universal > Monster Movie > What other crazy surprises does “America: The Motion Picture” has in store > for its audience? Find out on June 30, when the film hits Netflix. Check out > the trailer below. > Channing Tatum's R-rated George Washington and the rest of the Founding > Fathers unite in a trailer for Netflix's America: The Motion Picture. > The trailer begins by reminding us this animated film comes "From the > Founding Fathers who brought you Archer, Spider-Man: Into the Spider-Verse, > The Expendables and Magic Mike." The Magic Mike part then comes into play > when a scene of gyrating dancers with neon clothing is quickly shown. Next, > we are introduced to Tatum's George Washington, who delivers the surprising > declaration, "I'm George Washington. Let's go start a fucking revolution." > Netflix has released a ridiculous trailer for its star-studded animated > comedy “America: The Motion Picture,” which stars Channing Tatum as the voice > of a beefed-up and vulgar George Washington in a satirical take on the > American Revolution. The movie hails from “Archer” producer Matt Thompson, > who directs a script by “Wonder Woman” writer Dave Callahan. With Tatum in an > executive producer role alongside partner Reid Carolin as well as Phil Lord > and Chris Miller, the wacky historical comedy is sure to be a hit with its > target audience. > Here’s the official synopsis: “For, like, thousands of years, the origins of > the United States of America have remained shrouded in mystery, lost to the > sands of time. Who built this ‘country tis of thee,’ and why? Only the > dinosaurs know… until now. For the first time in human history, the > incredible, completely true story of America’s o
[jira] [Resolved] (KAFKA-10614) Group coordinator onElection/onResignation should guard against leader epoch
[ https://issues.apache.org/jira/browse/KAFKA-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-10614. --- Fix Version/s: 3.0.0 Resolution: Fixed > Group coordinator onElection/onResignation should guard against leader epoch > > > Key: KAFKA-10614 > URL: https://issues.apache.org/jira/browse/KAFKA-10614 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Guozhang Wang >Assignee: Tom Bentley >Priority: Major > Fix For: 3.0.0 > > > When there are a sequence of LeaderAndISR or StopReplica requests sent from > different controllers causing the group coordinator to elect / resign, we may > re-order the events due to race condition. For example: > 1) First LeaderAndISR request received from old controller to resign as the > group coordinator. > 2) Second LeaderAndISR request received from new controller to elect as the > group coordinator. > 3) Although threads handling the 1/2) requests are synchronized on the > replica manager, their callback {{onLeadershipChange}} would trigger > {{onElection/onResignation}} which would schedule the loading / unloading on > background threads, and are not synchronized. > 4) As a result, the {{onElection}} maybe triggered by the thread first, and > then {{onResignation}}. As a result, the coordinator would not recognize it > self as the coordinator and hence would respond any coordinator request with > {{NOT_COORDINATOR}}. > Here are two proposals on top of my head: > 1) Let the scheduled load / unload function to keep the passed in leader > epoch, and also materialize the epoch in memory. Then when execute the > unloading check against the leader epoch. > 2) This may be a bit simpler: using a single background thread working on a > FIFO queue of loading / unloading jobs, since the caller are actually > synchronized on replica manager and order preserved, the enqueued loading / > unloading job would be correctly ordered as well. In that case we would avoid > the reordering. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE
Guozhang Wang created KAFKA-12887: - Summary: Do not trigger user-customized ExceptionalHandler for RTE Key: KAFKA-12887 URL: https://issues.apache.org/jira/browse/KAFKA-12887 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today in StreamThread we have a try-catch block that captures all {{Throwable e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. However, there are possible RTEs such as IllegalState/IllegalArgument exceptions which are usually caused by bugs, etc. In such cases we should not let users to decide what to do with these exceptions, but should let Streams itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we should fail fast to notify the potential error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12812) Consider refactoring state store registration path
Guozhang Wang created KAFKA-12812: - Summary: Consider refactoring state store registration path Key: KAFKA-12812 URL: https://issues.apache.org/jira/browse/KAFKA-12812 Project: Kafka Issue Type: Bug Components: streams Reporter: Guozhang Wang Today our state store registration call path within the stateManager (both local and global) is like this: {code} for each store: store.init(store, context) -> context.register(root, callback) -> stateManager.registerStore(store, callback) {code} One can see that, we have an awkward loop from stateManager back to stateManager, and we require users to not forget calling context.register(root, callback). We do this only in order to let users pass the customized callback implementation to the stateManager. What about a different path like this: 1) We add a new interface in StateStore, like `StateRestoreCallback getCallback()` that each impl class need to provide. 2) We remove the `context.register(root, callback)` call; and because of that, we do not need to pass in `root` in the store.init as well. 3) stateManager just call `store.init(context)` (without the first parameter), and then put the store along with its restore callback into the map, without the separate `registerStore` function. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12683) Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"
[ https://issues.apache.org/jira/browse/KAFKA-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12683. --- Fix Version/s: 3.0.0 Assignee: Guozhang Wang Resolution: Fixed > Remove deprecated "UsePreviousTimeOnInvalidTimeStamp" > - > > Key: KAFKA-12683 > URL: https://issues.apache.org/jira/browse/KAFKA-12683 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12693) Consecutive rebalances with zombie instances may cause corrupted changelogs
Guozhang Wang created KAFKA-12693: - Summary: Consecutive rebalances with zombie instances may cause corrupted changelogs Key: KAFKA-12693 URL: https://issues.apache.org/jira/browse/KAFKA-12693 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang When an instance (or thread within an instance) of Kafka Streams has a soft failure and the group coordinator triggers a rebalance, that instance would temporarily become a "zombie writer". That is, this instance does not know there's already a new rebalance and hence its partitions have been migrated out, until it tries to commit and then got notified of the illegal-generation error and realize itself is the "zombie" already. During this period until the commit, this zombie may still be writing data to the changelogs of the migrated tasks as the new owner has already taken over and also writing to the changelogs. When EOS is enabled, this would not be a problem: when the zombie tries to commit and got notified that it's fenced, its zombie appends would be aborted. With EOS disabled, though, such shared writes would be interleaved on the changelogs where a zombie append may arrive later after the new writer's append, effectively overwriting that new append. Note that such interleaving writes do not necessarily cause corrupted data: as long as the new producer keep appending after the old zombie stops, and all the corrupted keys are overwritten again by the new values, then it is fine. However, if there are consecutive rebalances where right after the changelogs are corrupted by zombie writers, and before the new writer can overwrite them again, the task gets migrated again and needs to be restored from changelogs, the old values would be restored instead of the new values, effectively causing data loss. Although this should be a rare event, we should fix it asap still. One idea is to have producers get a PID even under ALOS: that is, we set the transactional id in the producer config, but did not trigger any txn APIs; when there are zombie producers, they would then be immediately fenced on appends and hence there's no interleaved appends. I think this may require a KIP still, since today one has to call initTxn in order to register and get the PID. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12683) Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"
Guozhang Wang created KAFKA-12683: - Summary: Remove deprecated "UsePreviousTimeOnInvalidTimeStamp" Key: KAFKA-12683 URL: https://issues.apache.org/jira/browse/KAFKA-12683 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12633) Remove deprecated "TopologyTestDriver#pipeInput / readOutput"
[ https://issues.apache.org/jira/browse/KAFKA-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12633. --- Fix Version/s: 3.0.0 Resolution: Fixed > Remove deprecated "TopologyTestDriver#pipeInput / readOutput" > - > > Key: KAFKA-12633 > URL: https://issues.apache.org/jira/browse/KAFKA-12633 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12669) Add deleteRange to WindowStore / KeyValueStore interfaces
Guozhang Wang created KAFKA-12669: - Summary: Add deleteRange to WindowStore / KeyValueStore interfaces Key: KAFKA-12669 URL: https://issues.apache.org/jira/browse/KAFKA-12669 Project: Kafka Issue Type: Bug Components: streams Reporter: Guozhang Wang We can consider adding such APIs where the underlying implementation classes have better optimizations than deleting the keys as get-and-delete one by one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12643) Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in transform/process (this.context.schedule function)
[ https://issues.apache.org/jira/browse/KAFKA-12643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12643. --- Resolution: Duplicate Thanks for confirming! > Kafka Streams 2.7 with Kafka Broker 2.6.x regression: bad timestamp in > transform/process (this.context.schedule function) > - > > Key: KAFKA-12643 > URL: https://issues.apache.org/jira/browse/KAFKA-12643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: David EVANO >Priority: Major > Attachments: Capture d’écran 2021-04-09 à 17.50.05.png > > > During a tranform() or a process() method: > Define a schedule tyask: > this.context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, > timestamp -> \{...} > store.put(...) or context.forward(...) produce a record with an invalid > timestamp. > For the forward, a workaround is define the timestamp: > context.forward(entry.key, entry.value.toString(), > To.all().withTimestamp(timestamp)); > But for state.put(...) or state.delete(...) functions there is no workaround. > Is it mandatory to have the Kafka broker version aligned with the Kafka > Streams version? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12633) Remove deprecated "TopologyTestDriver#pipeInput / readOutput"
Guozhang Wang created KAFKA-12633: - Summary: Remove deprecated "TopologyTestDriver#pipeInput / readOutput" Key: KAFKA-12633 URL: https://issues.apache.org/jira/browse/KAFKA-12633 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12630) Remove deprecated KafkaClientSupplier#getAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-12630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12630. --- Fix Version/s: 3.0.0 Resolution: Fixed > Remove deprecated KafkaClientSupplier#getAdminClient > > > Key: KAFKA-12630 > URL: https://issues.apache.org/jira/browse/KAFKA-12630 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12568) Remove deprecated "KStream#groupBy/join", "Joined#named" overloads
[ https://issues.apache.org/jira/browse/KAFKA-12568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12568. --- Fix Version/s: 3.0.0 Assignee: Guozhang Wang Resolution: Fixed > Remove deprecated "KStream#groupBy/join", "Joined#named" overloads > -- > > Key: KAFKA-12568 > URL: https://issues.apache.org/jira/browse/KAFKA-12568 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12630) Remove deprecated KafkaClientSupplier#getAdminClient
Guozhang Wang created KAFKA-12630: - Summary: Remove deprecated KafkaClientSupplier#getAdminClient Key: KAFKA-12630 URL: https://issues.apache.org/jira/browse/KAFKA-12630 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-7785) Remove PartitionGrouper interface and it's config and move DefaultPartitionGrouper to internal package
[ https://issues.apache.org/jira/browse/KAFKA-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7785. -- Resolution: Fixed > Remove PartitionGrouper interface and it's config and move > DefaultPartitionGrouper to internal package > -- > > Key: KAFKA-7785 > URL: https://issues.apache.org/jira/browse/KAFKA-7785 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Assignee: highluck >Priority: Blocker > Fix For: 3.0.0 > > > Since {{DefaultPartitionGrouper}} is only for the purpose of the internal > {{StreamsPartitionAssignor}} it would make sense to have it in the > {{org.apache.kafka.streams.processor.internals}} package. > I would also vote to move {{PartitionGrouper.}} > Via KAFKA-8927 we deprecated the `PartitionGrouper` interface in 2.4 release > – this allows us to remove the public interface and its corresponding config > in the next major release (ie, 3.0.0). `DefaultPartitionGrouper` was > implicitly deprecated via KAFKA-8927. > Hence, we can move the interface as well as the default implementation into > an internal package (or maybe just remove the interface completely as there > are no plans to support multiple implementations atm). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9765) Could not add partitions to transaction due to errors
[ https://issues.apache.org/jira/browse/KAFKA-9765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9765. -- Resolution: Duplicate > Could not add partitions to transaction due to errors > - > > Key: KAFKA-9765 > URL: https://issues.apache.org/jira/browse/KAFKA-9765 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.3.1 >Reporter: Prashant Waykar >Priority: Blocker > Fix For: 2.4.2, 2.5.0 > > > I am following the producer with transactions example in > [https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html,] > and on kafkaException, I use abortTransaction and retry. > I am seeing these exceptions. Has anyone experienced this before ? Please > suggest > {code:java} > // code placeholder > java.util.concurrent.ExecutionException: > org.apache.kafka.common.KafkaException: Could not add partitions to > transaction due to errors: {nfvAlarmJob-0=UNKNOWN_SERVER_ERROR} > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > at > com.vmware.vchs.hybridity.messaging.kafka.KafkaProducerDelegate.publishMessageWithTransaction(KafkaProducerDelegate.java:197) > at > com.vmware.vchs.hybridity.messaging.kafka.KafkaProducerDelegate.publish(KafkaProducerDelegate.java:164) > at > com.vmware.vchs.hybridity.messaging.kafka.KafkaProducerDelegate.publish(KafkaProducerDelegate.java:158) > at > com.vmware.vchs.hybridity.messaging.adapter.JobManagerJobPublisher.publish(JobManagerJobPublisher.java:140) > at > com.vmware.vchs.hybridity.messaging.adapter.JobManager.queueJob(JobManager.java:1720) > at > com.vmware.vchs.hybridity.messaging.adapter.JobManagementAdapter.queueJob(JobManagementAdapter.java:80) > at > com.vmware.vchs.hybridity.messaging.adapter.JobManagementAdapter.queueJob(JobManagementAdapter.java:70) > at > com.vmware.vchs.hybridity.messaging.adapter.JobManagedService.queueJob(JobManagedService.java:168) > at > com.vmware.hybridity.nfvm.alarm.UpdateVcenterAlarmsJob.run(UpdateVcenterAlarmsJob.java:67) > at > com.vmware.vchs.hybridity.messaging.LoggingJobWrapper.run(LoggingJobWrapper.java:41) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Could not add partitions > to transaction due to errors: {nfvAlarmJob-0=UNKNOWN_SERVER_ERROR} > at > org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1230) > at > org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1069) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > ... 1 common frames omitted > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12428) Add a last-heartbeat-seconds-ago metric to Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12428. --- Resolution: Duplicate > Add a last-heartbeat-seconds-ago metric to Kafka Consumer > - > > Key: KAFKA-12428 > URL: https://issues.apache.org/jira/browse/KAFKA-12428 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > I have encountered several issues in the past where heartbeat requests are > not sent [1,2] (either in time, or ever), and today it is a bit hard to get > to that from the logs. I think it is better to add a metric as > "last-heartbeat-seconds-ago" where when rebalances were triggered we can > immediately find out if this is the root cause. > 1. https://issues.apache.org/jira/browse/KAFKA-10793 > 2. https://issues.apache.org/jira/browse/KAFKA-10827 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-7106) Remove segment/segmentInterval from Window definition
[ https://issues.apache.org/jira/browse/KAFKA-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7106. -- Resolution: Fixed > Remove segment/segmentInterval from Window definition > - > > Key: KAFKA-7106 > URL: https://issues.apache.org/jira/browse/KAFKA-7106 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: Guozhang Wang >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > Currently, Window configures segment and segmentInterval properties, but > these aren't truly properties of a window in general. > Rather, they are properties of the particular implementation that we > currently have: a segmented store. Therefore, these properties should be > moved to configure only that implementation. > > This may be related to KAFKA-4730, since an in-memory window store wouldn't > necessarily need to be segmented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12562) Remove deprecated-overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store"
[ https://issues.apache.org/jira/browse/KAFKA-12562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12562. --- Resolution: Fixed > Remove deprecated-overloaded "KafkaStreams#metadataForKey" and > "KafkaStreams#store" > --- > > Key: KAFKA-12562 > URL: https://issues.apache.org/jira/browse/KAFKA-12562 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12524) Remove deprecated WindowBytesStoreSupplier#segments
[ https://issues.apache.org/jira/browse/KAFKA-12524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12524. --- Resolution: Fixed > Remove deprecated WindowBytesStoreSupplier#segments > --- > > Key: KAFKA-12524 > URL: https://issues.apache.org/jira/browse/KAFKA-12524 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12568) Remove deprecated "KStream#groupBy/join", "Joined#named" overloads
Guozhang Wang created KAFKA-12568: - Summary: Remove deprecated "KStream#groupBy/join", "Joined#named" overloads Key: KAFKA-12568 URL: https://issues.apache.org/jira/browse/KAFKA-12568 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12562) Remove deprecated-overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store"
Guozhang Wang created KAFKA-12562: - Summary: Remove deprecated-overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store" Key: KAFKA-12562 URL: https://issues.apache.org/jira/browse/KAFKA-12562 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Assignee: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12527) Remove deprecated "PartitionGrouper" interface
[ https://issues.apache.org/jira/browse/KAFKA-12527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12527. --- Resolution: Duplicate > Remove deprecated "PartitionGrouper" interface > -- > > Key: KAFKA-12527 > URL: https://issues.apache.org/jira/browse/KAFKA-12527 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12526) Remove deprecated long ms overloads
[ https://issues.apache.org/jira/browse/KAFKA-12526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12526. --- Resolution: Duplicate > Remove deprecated long ms overloads > --- > > Key: KAFKA-12526 > URL: https://issues.apache.org/jira/browse/KAFKA-12526 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12549) Allow state stores to opt-in transactional support
Guozhang Wang created KAFKA-12549: - Summary: Allow state stores to opt-in transactional support Key: KAFKA-12549 URL: https://issues.apache.org/jira/browse/KAFKA-12549 Project: Kafka Issue Type: New Feature Components: streams Reporter: Guozhang Wang Right now Kafka Stream's EOS implementation does not make any assumptions about the state store's transactional support. Allowing the state stores to optionally provide transactional support can have multiple benefits. E.g., if we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, {{commitTxn}} and {{abortTxn}}. Then these APIs can be used under both ALOS and EOS such that: * store.beginTxn * store.put // during processing * streams commit // either through eos protocol or not * store.commitTxn We can have the following benefits: * Reduce the duplicated records upon crashes for ALOS (note this is not EOS still, but some middle-ground where uncommitted data within a state store would not be retained if store.commitTxn failed). * No need to wipe the state store and re-bootstrap from scratch upon crashes for EOS. E.g., if a crash-failure happened between streams commit completes and store.commitTxn. We can instead just roll-forward the transaction by replaying the changelog from the second recent streams committed offset towards the most recent committed offset. * Remote stores that support txn then does not need to support wiping (https://issues.apache.org/jira/browse/KAFKA-12475). * We can fix the known issues of emit-on-change (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams). * We can support "query committed data only" for interactive queries (see below for reasons). As for the implementation of these APIs, there are several options: * The state store itself have natural transaction features (e.g. RocksDB). * Use an in-memory buffer for all puts within a transaction, and upon `commitTxn` write the whole buffer as a batch to the underlying state store, or just drop the whole buffer upon aborting. Then for interactive queries, one can optionally only query the underlying store for committed data only. * Use a separate store as the transient persistent buffer. Upon `beginTxn` create a new empty transient store, and upon `commitTxn` merge the store into the underlying store. Same applies for interactive querying committed-only data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12527) Remove deprecated "PartitionAssignor" interface
Guozhang Wang created KAFKA-12527: - Summary: Remove deprecated "PartitionAssignor" interface Key: KAFKA-12527 URL: https://issues.apache.org/jira/browse/KAFKA-12527 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12526) Remove deprecated long ms overloads
Guozhang Wang created KAFKA-12526: - Summary: Remove deprecated long ms overloads Key: KAFKA-12526 URL: https://issues.apache.org/jira/browse/KAFKA-12526 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12524) Remove deprecated WindowBytesStoreSupplier#segments
Guozhang Wang created KAFKA-12524: - Summary: Remove deprecated WindowBytesStoreSupplier#segments Key: KAFKA-12524 URL: https://issues.apache.org/jira/browse/KAFKA-12524 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12514) NPE in SubscriptionState
[ https://issues.apache.org/jira/browse/KAFKA-12514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12514. --- Fix Version/s: 3.0.0 Resolution: Fixed > NPE in SubscriptionState > > > Key: KAFKA-12514 > URL: https://issues.apache.org/jira/browse/KAFKA-12514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 3.0.0 > > > In a soak test, we got this exception: > > {code:java} > java.lang.NullPointerExceptionat > org.apache.kafka.clients.consumer.internals.SubscriptionState.partitionLag(SubscriptionState.java:545) >at > org.apache.kafka.clients.consumer.KafkaConsumer.currentLag(KafkaConsumer.java:2241) > at > org.apache.kafka.streams.processor.internals.PartitionGroup.readyToProcess(PartitionGroup.java:143) > at > org.apache.kafka.streams.processor.internals.StreamTask.isProcessable(StreamTask.java:650) >at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:661) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1114) > {code} > This is related to the implementation of: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization] > aka > https://issues.apache.org/jira/browse/KAFKA-10091 > > Luckily, the stack trace is pretty unambiguous. I'll open a PR shortly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
Guozhang Wang created KAFKA-12472: - Summary: Add a Consumer / Streams metric to indicate the current rebalance status Key: KAFKA-12472 URL: https://issues.apache.org/jira/browse/KAFKA-12472 Project: Kafka Issue Type: Improvement Components: consumer, streams Reporter: Guozhang Wang Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. CoordinatorRequested => any of the coordinator response contains a RebalanceInProgress error code. 2. NewMember => when the join group response has a MemberIdRequired error code. 3. UnknownMember => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. StaleMember => when any of the coordinator response contains an IllegalGeneration error code. 5. DroppedGroup => when hb thread decides to call leaveGroup due to hb expired. 6. UserRequested => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. MetadataChanged => requestRejoin triggered since metadata has changed. 8. SubscriptionChanged => requestRejoin triggered since subscription has changed. 9. RetryOnError => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. RevocationNeeded => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then its overrides all embedded consumer layer status code. In addition, when create aggregated metric across streams instance within an app, we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. RevocationNeeded => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. AssignmentProbing => leader decides to schedule follow-ups since the current assignment is unstable. 12. VersionProbing => leader decides to schedule follow-ups due to version probing. 13. EndpointUpdate => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12428) Add a last-heartbeat-seconds-ago metric to Kafka Consumer
Guozhang Wang created KAFKA-12428: - Summary: Add a last-heartbeat-seconds-ago metric to Kafka Consumer Key: KAFKA-12428 URL: https://issues.apache.org/jira/browse/KAFKA-12428 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Guozhang Wang I have encountered several issues in the past where heartbeat requests are not sent [1,2] (either in time, or ever), and today it is a bit hard to get to that from the logs. I think it is better to add a metric as "last-heartbeat-seconds-ago" where when rebalances were triggered we can immediately find out if this is the root cause. 1. https://issues.apache.org/jira/browse/KAFKA-10793 2. https://issues.apache.org/jira/browse/KAFKA-10827 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12419) Remove Deprecated APIs of Kafka Streams in 3.0
Guozhang Wang created KAFKA-12419: - Summary: Remove Deprecated APIs of Kafka Streams in 3.0 Key: KAFKA-12419 URL: https://issues.apache.org/jira/browse/KAFKA-12419 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Fix For: 3.0.0 Here's a list of deprecated APIs that we have accumulated in the past, we can consider removing them in 3.0: * KIP-198: "--zookeeper" flag from StreamsResetter (1.0) * KIP-171: "–execute" flag from StreamsResetter (1.1) * KIP-233: overloaded "StreamsBuilder#addGlobalStore" (1.1) * KIP-251: overloaded "ProcessorContext#forward" (2.0) * KIP-276: "StreamsConfig#getConsumerConfig" (2.0) * KIP-319: "WindowBytesStoreSupplier#segments" (2.1) * KIP-321: "TopologyDescription.Source#topics" (2.1) * KIP-328: "Windows#until/segmentInterval/maintainMS" (2.1) * KIP-358: "Windows/Materialized" overloaded functions with `long` (2.1) * KIP-365/366: Implicit Scala Apis (2.1) * KIP-372: overloaded "KStream#groupBy" (2.1) * KIP-307: "Joined#named" (2.3) * KIP-345: Broker config "group.initial.rebalance.delay.ms" (2.3) * KIP-429: "PartitionAssignor" interface (2.4) * KIP-470: "TopologyTestDriver#pipeInput" (2.4) * KIP-476: overloaded "KafkaClientSupplier#getAdminClient" (2.4) * KIP-479: overloaded "KStream#join" (2.4) * KIP-530: old "UsePreviousTimeOnInvalidTimeStamp" (2.5) * KIP-535 / 562: overloaded "KafkaStreams#metadataForKey" and "KafkaStreams#store" (2.5) And here's a list of already filed JIRAs for removing deprecated APIs * KAFKA-10434 * KAFKA-7785 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12323) Record timestamps not populated in event
[ https://issues.apache.org/jira/browse/KAFKA-12323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12323. --- Resolution: Fixed > Record timestamps not populated in event > > > Key: KAFKA-12323 > URL: https://issues.apache.org/jira/browse/KAFKA-12323 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Adam Bellemare >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.8.0, 2.7.1 > > Attachments: PunctuateTimestampZeroTest.java > > > Upgraded a kafka streams application from 2.6.0 to 2.7.0. Noticed that the > events being produced had a "CreatedAt" timestamp = 0, causing downstream > failures as we depend on those timestamps. Reverting back to 2.6.0/2.6.1 > fixed this issue. This was the only change to the Kafka Streams application. > Consuming the event stream produced by 2.6.0 results in events that, when > consumed using the `kafka-avro-console-consumer` and `--property > print.timestamp=true` result in events prepended with the event times, such > as: > {code:java} > CreateTime:1613072202271 > CreateTime:1613072203412 > CreateTime:1613072205431 > {code} > etc. > However, when those events are produced by the Kafka Streams app using 2.7.0, > we get: > {code:java} > CreateTime:0 > CreateTime:0 > CreateTime:0 > {code} > I don't know if these is a default value somewhere that changed, but this is > actually a blocker for our use-cases as we now need to circumnavigate this > limitation (or roll back to 2.6.1, though there are other issues we must deal > with then). I am not sure which unit tests in the code base to look at to > validate this, but I wanted to log this bug now in case someone else has > already seen this or an open one exists (I didn't see one though). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12370) Refactor KafkaStreams exposed metadata hierarchy
Guozhang Wang created KAFKA-12370: - Summary: Refactor KafkaStreams exposed metadata hierarchy Key: KAFKA-12370 URL: https://issues.apache.org/jira/browse/KAFKA-12370 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Currently in KafkaStreams we have two groups of metadata getter: 1. {code} allMetadata allMetadataForStore {code} Return collection of {{StreamsMetadata}}, which only contains the partitions as active/standby, plus the hostInfo, but not exposing any task info. 2. {code} queryMetadataForKey {code} Returns {{KeyQueryMetadata}} that includes the hostInfos of active and standbys, plus the partition id. 3. {code} localThreadsMetadata {code} Returns {{ThreadMetadata}}, that includes a collection of {{TaskMetadata}} for active and standby tasks. All the above functions are used for interactive queries, but their exposed metadata are very different, and some use cases would need to have all client, thread, and task metadata to fulfill the feature development. At the same time, we may have a more dynamic "task -> thread" mapping in the future and also the embedded clients like consumers would not be per thread, but per client. --- Rethinking about the metadata, I feel we can have a more consistent hierarchy as the following: * {{StreamsMetadata}} represent the metadata for the client, which includes the set of {{ThreadMetadata}} for its existing thread and the set of {{TaskMetadata}} for active and standby tasks assigned to this client, plus client metadata including hostInfo, embedded client ids. * {{ThreadMetadata}} includes name, state, the set of {{TaskMetadata}} for currently assigned tasks. * {{TaskMetadata}} includes the name (including the sub-topology id and the partition id), the state, the corresponding sub-topology description (including the state store names, source topic names). * {{allMetadata}}, {{allMetadataForStore}}, {{allMetadataForKey}} (renamed from queryMetadataForKey) returns the set of {{StreamsMetadata}}, and {{localMetadata}} (renamed from localThreadMetadata) returns a single {{StreamsMetadata}}. To illustrate as an example, to find out who are the current active host / standby hosts of a specific store, we would call {{allMetadataForStore}}, and for each returned {{StreamsMetadata}} we loop over their contained {{TaskMetadata}} for active / standby, and filter by its corresponding sub-topology's description's contained store name. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12352) Improve debuggability with continuous consumer rebalances
Guozhang Wang created KAFKA-12352: - Summary: Improve debuggability with continuous consumer rebalances Key: KAFKA-12352 URL: https://issues.apache.org/jira/browse/KAFKA-12352 Project: Kafka Issue Type: Improvement Components: consumer, streams Reporter: Guozhang Wang Assignee: Guozhang Wang There are several scenarios where a consumer/streams client can fall into continuous rebalances and hence does not make any progress. Today when this happens, developers usually need to do a lot digging in order to get insights on what happens. Here's short summary of different scenarios where we (re-)trigger rebalances: 1. Group member kicked out of the group: when the coordinator kicked out the member, later on when the member issues a join / sync / heartbeat / offset-commit, it will fail and the member will try to re-join. When the member was constantly calling poll too late, it would continuously fall into this scenario and not make progress. 2. Group is rebalancing: if the group is rebalancing at the moment, the member's heartbeat / offset commit / sync-group will fail. In this case the member rejoining the group is not the root cause of the rebalancing anyways. 3. Caller enforce a rebalance via `enforceRebalance`. This is one-off and should not cause rebalance storms. 4. After a rebalance is completed, the member found out that a) its subscription has changed or 2) its subscribed topics' number of partitions changed since the re-join request was sent. In this case it needs to re-trigger the rebalance in order to get the new assignment. Since the subscription change is one-off, it should not cause rebalance storms; topic metadata change should also be infrequent, but there are some rare cases where topic metadata keeps "vibrating" due to broker side issues. 5. After a rebalance is completed, the member need to revoke some partitions as indicated by the assignment. After the revocation it needs to re-join the group. This may cause rebalance storms when the partition assignor was sub-optimal in determining the assignment and hence the partitions keep migrating around and rebalances triggered continuously. As we can see, 1/5 above could potentially cause rebalance storms, while 2/3/4 should not in normal cases. In all of these scenarios, we should expose exactly the reason why the member is re-joining the group, and whether this re-joining the group would trigger the rebalance, or if it is already in a rebalance (hence join-group itself is not causing it, but the result of it). This could help operators to quickly nail down which of the above may be the root cause of continuous rebalances. I'd suggest we first go through the log4j hierarchy to make sure this is the right place, and maybe in the future we can expose a single state metric on top of the logging categorization for even convienent trouble shooting. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12242) Decouple state store materialization enforcement from name/serde provider
Guozhang Wang created KAFKA-12242: - Summary: Decouple state store materialization enforcement from name/serde provider Key: KAFKA-12242 URL: https://issues.apache.org/jira/browse/KAFKA-12242 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Many users of Streams would want the following: let the Streams runtime to decide whether or not to materialize a state store; AND if it decides to do so, use the store name / serdes I provided ahead of time, if not, then nothing happens (the provided store name and serdes can just be dropped). However, Streams today take `Materialized` as an indicator to enforce the materialization. We should think of a way for users to optionally decouple materialization enforcement from name/serde provider. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12219) Potential race condition in InMemoryKeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12219. --- Fix Version/s: 2.8.0 Resolution: Fixed > Potential race condition in InMemoryKeyValueStore > - > > Key: KAFKA-12219 > URL: https://issues.apache.org/jira/browse/KAFKA-12219 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Critical > Fix For: 2.8.0 > > > With KAFKA-8802 (included in [2.4.0 > release|https://downloads.apache.org/kafka/2.4.0/RELEASE_NOTES.html]), > {{ConcurrentSkipListMap}} in {{InMemoryKeyValueStore}} was reverted into > {{TreeMap}} for performance issues. However, the {{synchronized}} keyword for > {{reverseRange}}, {{reverseAll}} methods were omitted, leaving possibility of > race condition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10830) Kafka Producer API should throw unwrapped exceptions
Guozhang Wang created KAFKA-10830: - Summary: Kafka Producer API should throw unwrapped exceptions Key: KAFKA-10830 URL: https://issues.apache.org/jira/browse/KAFKA-10830 Project: Kafka Issue Type: Improvement Components: producer Reporter: Guozhang Wang Today in various KafkaProducer APIs (especially send and transaction related) we wrap many of the underlying exception with a KafkaException. In some nested calls we may even wrap it more than once. Although the initial goal is to not expose the root cause directly to users, it also brings confusion to advanced user's error handling that some KafkaException wrapped root cause may be handled differently. Since all of those exceptions are public classes anyways (since one can still get them via exception.root()) and they are all inheriting KafkaException, I'd suggest we do not do any wrapping any more and throw the exception directly. For those users who just capture all KafkaException and handle them universally it is still compatible; but for those users who want to handle exceptions differently it would introduce an easier way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10829) Kafka Streams handle produce exception improvement
Guozhang Wang created KAFKA-10829: - Summary: Kafka Streams handle produce exception improvement Key: KAFKA-10829 URL: https://issues.apache.org/jira/browse/KAFKA-10829 Project: Kafka Issue Type: Improvement Components: producer , streams Reporter: Guozhang Wang A summary of some recent discussions on how we should improve on embedded producer exception handling. Note that below the basline logic would guarantee that our correctness semantics is not violated; and optimization are on top of the baseline to reduce the user's burden by letting the library auto-handle certain types of exception. 1) ``Producer.send()`` throw exception directly: 1.a) baseline (to make sure correctness) logic is to always wrap them as StreamsException, it would cause the thread to shutdown and exception handler triggered. The handler could look into the wrapped exception and decide whether the shutdown thread can be restarted. 1.b) optimization is to look at the exception, and decide if they can be wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would then be auto-handled by lost-all-tasks and re-join. 2) ``Producer.send()`` Callback has an exception: 2.a) baseline is first to check if the exception is instanceof RetriableException. If not retriable, pass it to the producer exception handler to decide whether to throw or to continue with record dropped. If decide to throw, always warp it as StreamsException and keep it locally; at the same time do not send more records from the caller. In the next send call, check the remembered exception and throw. It would cause the thread to shutdown and exception handler triggered. If the exception is not Retriable, always throw it as a fatal StreamsException. 2.b) optimization one: if the non-retriable exception can be translated as a TaskMigratedException, then do not wrap it as StreamsException to let the library handle internally. 2.c) optimization two: if the retriable exception is a timeout exception, then do not pass to the produce exception handler and treat it as TaskMigrated. 3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly: 3.a) baseline logic is to capture all KafkaException except TimeoutException, and handle them as *TaskCorrupted* (which include abort the transaction, reset the state, and re-join the group). TimeoutException would be rethrown. 3.b) optimization: some exceptions can be handled as TaskMigrated, which would be handled in a lighter way. 4) ``Producer.abortTxn`` throw exception: 3.a) baseline logic is to capture all KafkaException except TimeoutException as fatal StreamsException. TimeoutException would be rethrown. 3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition means the abort did not succeeded). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10725) Merge StoreQueryIntegrationTest and QueryableStateIntegrationTest
Guozhang Wang created KAFKA-10725: - Summary: Merge StoreQueryIntegrationTest and QueryableStateIntegrationTest Key: KAFKA-10725 URL: https://issues.apache.org/jira/browse/KAFKA-10725 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: Guozhang Wang These two integration tests are covering different issues, and have had their own flakiness in the past. I think it's better to merge them into a single class (and some of them can better be reduced to unit tests?) so that if there's still flakiness, we only need to fix it in one place. -- This message was sent by Atlassian Jira (v8.3.4#803005)