[jira] [Comment Edited] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183758#comment-17183758 ] Goltseva Taisiia edited comment on KAFKA-10426 at 8/25/20, 6:47 AM: Hi, [~ChrisEgerton], [~kkonstantine] ! Could you, please, take a look at my PR? was (Author: xakassi): Hi, [~ChrisEgerton] ! Could you, please, take a look at my PR? > Deadlock in KafkaConfigBackingStore > --- > > Key: KAFKA-10426 > URL: https://issues.apache.org/jira/browse/KAFKA-10426 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1, 2.6.0 >Reporter: Goltseva Taisiia >Assignee: Goltseva Taisiia >Priority: Critical > Labels: pull-request-available > > Hi, guys! > We faced the following deadlock: > > {code:java} > KafkaBasedLog Work Thread - _streaming_service_config > priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId > (decimal):2384 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) > - waiting to lock <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) > - locked <0xd8c3be40> (a java.lang.Object) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) > CustomDistributedHerder-connect-1 > priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId > (decimal):2362 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) > - waiting to lock <0xd8c3be40> (a java.lang.Object) > at > com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) > - locked <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) > at > com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > DistributedHerder went to updateConfigsWithIncrementalCooperative() > synchronized method and called configBackingStore.snapshot() which take a > lock on internal object in KafkaConfigBackingStore class. > > Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized > block on internal object got SESSION_KEY record and called > updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. > > As I can see the problem is here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] > > As I understand this call should be performed outside synchronized block: > {code:java} > if (started) > > updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} > > I'm going to make a PR. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183758#comment-17183758 ] Goltseva Taisiia commented on KAFKA-10426: -- Hi, [~ChrisEgerton] ! Could you, please, take a look at my PR? > Deadlock in KafkaConfigBackingStore > --- > > Key: KAFKA-10426 > URL: https://issues.apache.org/jira/browse/KAFKA-10426 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1, 2.6.0 >Reporter: Goltseva Taisiia >Assignee: Goltseva Taisiia >Priority: Critical > Labels: pull-request-available > > Hi, guys! > We faced the following deadlock: > > {code:java} > KafkaBasedLog Work Thread - _streaming_service_config > priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId > (decimal):2384 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) > - waiting to lock <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) > - locked <0xd8c3be40> (a java.lang.Object) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) > CustomDistributedHerder-connect-1 > priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId > (decimal):2362 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) > - waiting to lock <0xd8c3be40> (a java.lang.Object) > at > com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) > - locked <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) > at > com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > DistributedHerder went to updateConfigsWithIncrementalCooperative() > synchronized method and called configBackingStore.snapshot() which take a > lock on internal object in KafkaConfigBackingStore class. > > Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized > block on internal object got SESSION_KEY record and called > updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. > > As I can see the problem is here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] > > As I understand this call should be performed outside synchronized block: > {code:java} > if (started) > > updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} > > I'm going to make a PR. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9182: KAFKA-10403 Replace scala collection by java collection in generating…
chia7712 commented on pull request #9182: URL: https://github.com/apache/kafka/pull/9182#issuecomment-679805021 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function
chia7712 commented on pull request #9162: URL: https://github.com/apache/kafka/pull/9162#issuecomment-679805254 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10429) Group Coordinator unavailability leads to missing events
[ https://issues.apache.org/jira/browse/KAFKA-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navinder Brar updated KAFKA-10429: -- Summary: Group Coordinator unavailability leads to missing events (was: Group Coordinator is unavailable leads to missing events) > Group Coordinator unavailability leads to missing events > > > Key: KAFKA-10429 > URL: https://issues.apache.org/jira/browse/KAFKA-10429 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1 >Reporter: Navinder Brar >Priority: Major > > We are regularly getting this Exception in logs. > [2020-08-25 03:24:59,214] INFO [Consumer > clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group > coordinator ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, > will attempt rediscovery > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > > And after sometime it becomes discoverable: > [2020-08-25 03:25:02,218] INFO [Consumer > clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, > groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: > null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > > Now, the doubt I have is why this unavailability doesn't trigger a rebalance > in the cluster. We have few hours of retention on the source Kafka Topics and > sometimes this unavailability stays over for more than few hours and since it > doesn't trigger a rebalance or stops processing on other nodes(which are > connected to GC) we never come to know that some issue has happened and till > then we lose events from our source topics. > > There are some resolutions mentioned on stackoverflow but those configs are > already set in our kafka: > default.replication.factor=3 > offsets.topic.replication.factor=3 > > It would be great to understand why this issue is happening and why it > doesn't trigger a rebalance and is there any known solution for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ryannedolan commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config
ryannedolan commented on a change in pull request #9215: URL: https://github.com/apache/kafka/pull/9215#discussion_r476170655 ## File path: connect/mirror/README.md ## @@ -141,7 +141,38 @@ nearby clusters. N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between data centers, and you may incur unnecessary data transfer costs. -## Shared configuration +## Configuration +### General Kafka Connect Config +All Kafka Connect, Source Connector, Sink Connector configs, as defined in [Kafka official doc] (https://kafka.apache.org/documentation/#connectconfigs), can be +directly used in MM2 configuration without prefix in the configuration name. As the starting point, most of these configs may work well with the exception of `tasks.max`. Review comment: I think you are referring to the default values of the Connect configuration properties -- maybe use "default" in this sentence to make that more clear. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10429) Group Coordinator is unavailable leads to missing events
Navinder Brar created KAFKA-10429: - Summary: Group Coordinator is unavailable leads to missing events Key: KAFKA-10429 URL: https://issues.apache.org/jira/browse/KAFKA-10429 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.1 Reporter: Navinder Brar We are regularly getting this Exception in logs. [2020-08-25 03:24:59,214] INFO [Consumer clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group coordinator ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) And after sometime it becomes discoverable: [2020-08-25 03:25:02,218] INFO [Consumer clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Now, the doubt I have is why this unavailability doesn't trigger a rebalance in the cluster. We have few hours of retention on the source Kafka Topics and sometimes this unavailability stays over for more than few hours and since it doesn't trigger a rebalance or stops processing on other nodes(which are connected to GC) we never come to know that some issue has happened and till then we lose events from our source topics. There are some resolutions mentioned on stackoverflow but those configs are already set in our kafka: default.replication.factor=3 offsets.topic.replication.factor=3 It would be great to understand why this issue is happening and why it doesn't trigger a rebalance and is there any known solution for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)
LMnet commented on pull request #8955: URL: https://github.com/apache/kafka/pull/8955#issuecomment-679502482 @vvcephei I completely forgot about tests, thanks for reminding me! I changed all usages of the old `org.apache.kafka.streams.scala.Serdes` to the new `org.apache.kafka.streams.scala.serialization.Serdes` in tests. I think it should be enough in terms of test coverage. Also, I fixed `KGroupedStream`, `SessionWindowedKStream`, `TimeWindowedKStream`, and docs: they used the old `Serdes` object. I was not sure about commits policy in Kafka so I added all these changes in the separate commit. I can squash it later if needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10424) MirrorMaker 2.0 does not replicates topic's "clean.policy"
[ https://issues.apache.org/jira/browse/KAFKA-10424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-10424: -- Assignee: Ning Zhang > MirrorMaker 2.0 does not replicates topic's "clean.policy" > -- > > Key: KAFKA-10424 > URL: https://issues.apache.org/jira/browse/KAFKA-10424 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0 >Reporter: Mikhail Grinfeld >Assignee: Ning Zhang >Priority: Major > > I needed to replicate schema-registry "_schemas" topic. > data was replicated successfully and everything looked good, but new > schema-registry started with warning that replicated topic's cleanup.policy > is not "compact" -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] LMnet commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)
LMnet commented on a change in pull request #8955: URL: https://github.com/apache/kafka/pull/8955#discussion_r476102121 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -24,6 +24,7 @@ import java.util import org.apache.kafka.common.serialization.{Deserializer, Serde, Serdes => JSerdes, Serializer} import org.apache.kafka.streams.kstream.WindowedSerdes +@deprecated("Use org.apache.kafka.streams.scala.serialization.Serdes") Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx opened a new pull request #9218: MINOR: Fix shouldNotResetEpochHistoryHeadIfUndefinedPassed
huxihx opened a new pull request #9218: URL: https://github.com/apache/kafka/pull/9218 In LeaderEpochFileCacheTest.scala, code is identical for `shouldNotResetEpochHistoryHeadIfUndefinedPassed` and `shouldNotResetEpochHistoryTailIfUndefinedPassed`. Seems `truncateFromStart` should be invoked in `shouldNotResetEpochHistoryHeadIfUndefinedPassed` instead of `truncateFromEnd`. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9038: KAFKA-10134: Check heartbeat timeout for poll fetches [DO NOT MERGE]
guozhangwang commented on pull request #9038: URL: https://github.com/apache/kafka/pull/9038#issuecomment-679463606 Closing as it will be subsumed by https://github.com/apache/kafka/pull/8834 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang closed pull request #9038: KAFKA-10134: Check heartbeat timeout for poll fetches [DO NOT MERGE]
guozhangwang closed pull request #9038: URL: https://github.com/apache/kafka/pull/9038 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9217: MINOR: fix JavaDoc
mjsax commented on a change in pull request #9217: URL: https://github.com/apache/kafka/pull/9217#discussion_r476026230 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java ## @@ -106,7 +105,7 @@ void register(final StateStore store, /** * Schedules a periodic operation for processors. A processor may call this method during - * {@link Processor#init(org.apache.kafka.streams.processor.ProcessorContext) initialization} or + * {@link Processor#init(ProcessorContext) initialization} or Review comment: Removing the package name to point to `org.apache.kafka.streams.processor.api.ProcessorContext` (note the `api` sub-package in the path) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9217: MINOR: fix JavaDoc
mjsax commented on a change in pull request #9217: URL: https://github.com/apache/kafka/pull/9217#discussion_r476025687 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java ## @@ -17,11 +17,12 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; /** - * Indicates that Kafka Streams is in state {@link KafkaStreams.State#CREATED CREATED} and thus state stores cannot be queries yet. + * Indicates that Kafka Streams is in state {@link State CREATED} and thus state stores cannot be queries yet. Review comment: We can only link to the enum, but not its values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9217: MINOR: fix JavaDoc
mjsax opened a new pull request #9217: URL: https://github.com/apache/kafka/pull/9217 Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once
huxihx commented on pull request #8984: URL: https://github.com/apache/kafka/pull/8984#issuecomment-679438909 @abbccdda Hi, do you have any time to review this patch? It's been quite a long time since no one had a look at it :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183654#comment-17183654 ] Guozhang Wang commented on KAFKA-10134: --- [~zhowei] Thanks for the new log files, it has been very helpful for me to nail down the root causes and I will refine an existing WIP PR https://github.com/apache/kafka/pull/8834 as a final fix for this. Please stay tuned. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.5.2, 2.6.1 > > Attachments: consumer3.log.2020-08-20.log, > consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
guozhangwang merged pull request #9094: URL: https://github.com/apache/kafka/pull/9094 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r476003479 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java ## @@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes @Deprecated @Override public WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo) { +return fetch(key, timeFrom, timeTo, true); +} + +@Override +public WindowStoreIterator backwardFetch(final Bytes key, final Instant from, final Instant to) { +final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); +final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); Review comment: Ok, feel free to just create a followup ticket to see if we can clean things up. No need to block this PR on it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r476001166 ## File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ## @@ -119,15 +118,16 @@ * * This iterator must be closed after use. * - * @param from the first key in the range - * @param tothe last key in the range - * @param timeFrom time range start (inclusive) - * @param timeTotime range end (inclusive) + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive) + * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if one of the given keys is {@code null} + * @throws NullPointerException if one of the given keys is {@code null} */ -@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed +// note, this method must be kept if super#fetch(...) is removed +@SuppressWarnings("deprecation") KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo); Review comment: I guess the `note, this method must be kept if super#fetch(...) is removed` comments are making me nervous, but they could be out of date. Anyways I don't think you need to clean all this up right now, just wondering what's going on here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r475999351 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java ## @@ -419,13 +504,13 @@ Long minTime() { } Review comment: I agree that it doesn't matter much, but I think it has to be "reverse" for both time and keys for correctness due to CachingWindowStore. The caching layer just puts everything into one byte buffer so when we go in reverse order it's just the opposite of forward, which means both key and time ordering is flipped. And we unfortunately need the ordering to match up between the cache and the underlying store due to that merging iterator guy This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
jeqo commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r475910689 ## File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ## @@ -119,15 +118,16 @@ * * This iterator must be closed after use. * - * @param from the first key in the range - * @param tothe last key in the range - * @param timeFrom time range start (inclusive) - * @param timeTotime range end (inclusive) + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive) + * @param timeTo time range end (inclusive) * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException if one of the given keys is {@code null} + * @throws NullPointerException if one of the given keys is {@code null} */ -@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed +// note, this method must be kept if super#fetch(...) is removed +@SuppressWarnings("deprecation") KeyValueIterator, V> fetch(K from, K to, long timeFrom, long timeTo); Review comment: These methods were introduced when adding Duration/Instant support https://github.com/apache/kafka/pull/5682. I don't think these are needed, we can do a similar change as for SessionStore read operations. wdyt? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java ## @@ -419,13 +504,13 @@ Long minTime() { } Review comment: For windowStore, only time-based index is been iterated backward. The KIP didn't considered reversing key/value stores internally. We would need another flag (apart from backward) to define order of internal keys, which its cumbersome, and the order between keys doesn't matter much or can be calculated by the user. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ## @@ -426,7 +558,12 @@ private void getNextSegmentIterator() { setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime()); Review comment: Will have to double check this. I have inverted the current/last segment for backwards use-case though. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java ## @@ -72,22 +86,40 @@ searchSpace.iterator(), Review comment: `searchSpace` will be reversed based on the `forward` flag, on `AbstractSegments`. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java ## @@ -163,7 +164,17 @@ public void put(final Bytes key, final byte[] value, final long windowStartTimes @Deprecated @Override public WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo) { +return fetch(key, timeFrom, timeTo, true); +} + +@Override +public WindowStoreIterator backwardFetch(final Bytes key, final Instant from, final Instant to) { +final long timeFrom = ApiUtils.validateMillisecondInstant(from, prepareMillisCheckFailMsgPrefix(from, "from")); +final long timeTo = ApiUtils.validateMillisecondInstant(to, prepareMillisCheckFailMsgPrefix(to, "to")); Review comment: Only backward compatibility. If it make sense to remove these deprecations as part of this KIP, I'd be happy to help cleaning it. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ## @@ -337,25 +462,32 @@ public synchronized void close() { private CacheIteratorWrapper(final Bytes key, final long timeFrom, - final long timeTo) { -this(key, key, timeFrom, timeTo); + final long timeTo, + final boolean forward) { +this(key, key, timeFrom, timeTo, forward); } private CacheIteratorWrapper(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, - final long timeTo) { + final long timeTo, + final boolean forward) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.timeTo = timeTo; this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); +this.forward = forward; this.segmentInterval = cacheFunction.getSegmentInterv
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r475985073 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -328,6 +328,68 @@ public void testAggregateLargeInput() { ); } +@Test +public void testEarlyRecords() { Review comment: Can we add maybe one or two more tests? I think at the least we should have one test that processes _only_ early records, and one test that covers input(s) with the same timestamp. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -148,7 +153,7 @@ public void processInOrder(final K key, final V value, final long timestamp) { boolean rightWinAlreadyCreated = false; // keep the left type window closest to the record -Window latestLeftTypeWindow = null; +KeyValue, ValueAndTimestamp> latestLeftTypeWindow = null; try ( final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( Review comment: We need to make sure the `fetch` bounds don't go into the negative. We only call `processEarly` if the record's timestamp is within the timeDifferenceMs, but here we search starting at timestamp - 2*timeDifferenceMs ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final HashSet windowStartTimes = new HashSet(); + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +timestamp - 2 * windows.timeDifferenceMs(), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long endTime = startTime + windows.timeDifferenceMs(); + +if (endTime == windows.timeDifferenceMs()) { +combinedWindow = next; +} else if (endTime > timestamp && startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else { Review comment: It took me a second to get this -- can we explicitly check `if startTime == timestamp + 1` instead of falling back to `else` and implicitly relying on the fetch bounds? You can just get rid of the `else` altogether or throw an IllegalStateException if none of the specific conditions are met and the else is reached, whatever makes sense to you ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -210,6 +216,66 @@ public void processInOrder(final K key, final V value, final long timestamp) { } } +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final HashSet windowStartTimes = new HashSet(); + +try ( +final KeyValueIterator, Va
[GitHub] [kafka] jthompson6 opened a new pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.
jthompson6 opened a new pull request #9216: URL: https://github.com/apache/kafka/pull/9216 The addBytes method adds the header using Schema.BYTES, which results in base64 encoding when the record is stored. SimpleHeaderConverter#toConnectHeader implements schema inference which we can use here, as the ConsumerRecord does not have header schema information. Testing: I added schema verification to the existing MirrorSourceTaskTest#testSerde, with a string and int example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9215: KAFKA-10133: MM2 readme update on config
ning2008wisc commented on pull request #9215: URL: https://github.com/apache/kafka/pull/9215#issuecomment-679420821 Hi @ryannedolan @mimaison just in case I could borrow few minutes from you to take a quick review. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8834: MINOR: Do not disable heartbeat during Rebalance
guozhangwang commented on pull request #8834: URL: https://github.com/apache/kafka/pull/8834#issuecomment-679419336 @ableegoldman I'm re-targeting this against https://issues.apache.org/jira/browse/KAFKA-10134 now as I found the root cause of that ticket is actually because we do not send (and hence reset timer of) heartbeat during rebalance, hopefully I will have this PR in a final-reviewable manner for merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10428) Mirror Maker connect applies base64 encoding to string headers
Jennifer Thompson created KAFKA-10428: - Summary: Mirror Maker connect applies base64 encoding to string headers Key: KAFKA-10428 URL: https://issues.apache.org/jira/browse/KAFKA-10428 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.6.0, 2.5.0, 2.4.0 Reporter: Jennifer Thompson MirrorSourceTask takes the header value as bytes from the ConsumerRecord, which does not have a header schema, and adds it to the SourceRecord headers using "addBytes". This uses Schema.BYTES as the schema for the header, and somehow, base64 encoding gets applied when the record gets committed. This means that my original header value "with_headers" (created with a python producer, and stored as a 12 character byte array) becomes the string value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 encoded version of the original. If I try to preempt this using "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing through the MirrorSourceTask. I think the base64 encoding may be coming from Values#append (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674), but I'm not sure how. That is invoked by SimpleConnectorHeader#fromConnectHeader via Values#convertToString. SimpleHeaderConverter#toConnectHeader produces the correct schema in this case, and solves the problem for me, but it seems to guess at the schema, so I'm not sure if it is the right solution. Since schemas seem to be required for SourceRecord headers, but not available from ConsumerRecord headers, I'm not sure what other option we have. I will open a PR with this solution -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
guozhangwang commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-679408267 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183630#comment-17183630 ] Sophie Blee-Goldman commented on KAFKA-10357: - Cool. I agree, KAFKA-3370 would (and still will) be nice to have, but we can make some quick & easy progress on the data-loss problem with the initialize+config proposal. And since this approach means detecting the repartition deletion during a rebalance, we can just leverage the existing assignment error code to handle 3) in a similar fashion to KAFKA-10355 / KIP-662 > Handle accidental deletion of repartition-topics as exceptional failure > --- > > Key: KAFKA-10357 > URL: https://issues.apache.org/jira/browse/KAFKA-10357 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Repartition topics are both written by Stream's producer and read by Stream's > consumer, so when they are accidentally deleted both clients may be notified. > But in practice the consumer would react to it much quicker than producer > since the latter has a delivery timeout expiration period (see > https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to > it, it will re-join the group since metadata changed and during the triggered > rebalance it would auto-recreate the topic silently and continue, causing > data lost silently. > One idea, is to only create all repartition topics *once* in the first > rebalance and not auto-create them any more in future rebalances, instead it > would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code > (https://issues.apache.org/jira/browse/KAFKA-10355). > The challenge part would be, how to determine if it is the first-ever > rebalance, and there are several wild ideas I'd like to throw out here: > 1) change the thread state transition diagram so that STARTING state would > not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the > assign function we can check if the state is still in CREATED and not RUNNING. > 2) augment the subscriptionInfo to encode whether or not this is the first > time ever rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183626#comment-17183626 ] Guozhang Wang commented on KAFKA-10357: --- Theoretically, I think today there's no perfect solution to 2) above since in the extreme case, one can, a) delete topic, b) re-create topic, and c) re-fill the topic to make it has the same offsets as before, together in between of two consecutive consumer fetches, in which case consumer would never detect there's an issue happened. But on the other hand, I also agree with [~ableegoldman] that this is not the primary scenario that we want to guard against anyways and if people really go wild to make that procedure it is out of Kafka's processing guarantees today. Our focus should be just 1) and to avoid us (Streams) re-creating the topics. Given that, I think at the moment #initialize plus an internal config to disable auto-internal-topic-creation (by default we would still enable it for compatibility) would be the easiest way to tackle 1), and it pushes the responsibility to users that they need to: * Ideally, only pick one instance of their streams app to call initialize when starting their app for the first time --- note, if a query is "reset" then restarting is the same as starting for the first time. * Set the internal config to disable auto-internal-topic-creation. KAFKA-3370 can be helpful for both 1) and 2) but again it is not "perfect", so if we would have to eventually push it to users, then we'd better do it early than later. > Handle accidental deletion of repartition-topics as exceptional failure > --- > > Key: KAFKA-10357 > URL: https://issues.apache.org/jira/browse/KAFKA-10357 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Repartition topics are both written by Stream's producer and read by Stream's > consumer, so when they are accidentally deleted both clients may be notified. > But in practice the consumer would react to it much quicker than producer > since the latter has a delivery timeout expiration period (see > https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to > it, it will re-join the group since metadata changed and during the triggered > rebalance it would auto-recreate the topic silently and continue, causing > data lost silently. > One idea, is to only create all repartition topics *once* in the first > rebalance and not auto-create them any more in future rebalances, instead it > would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code > (https://issues.apache.org/jira/browse/KAFKA-10355). > The challenge part would be, how to determine if it is the first-ever > rebalance, and there are several wild ideas I'd like to throw out here: > 1) change the thread state transition diagram so that STARTING state would > not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the > assign function we can check if the state is still in CREATED and not RUNNING. > 2) augment the subscriptionInfo to encode whether or not this is the first > time ever rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
ableegoldman commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-679399444 @guozhangwang can we finally merge this? 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-679398619 @bbejeck It failed because it print the partition number as a single integer after the value. I moved the partition to be before the key (if printed) and value, and also added prefix "Partition:" to differentiate it from "Offset:". It's only printed if "print.partition=true". I will keep the tests as is then. ConsoleConsumerTest is a unit test for the class while DefaultMessageFormatterTest is more of an integration test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on pull request #9039: URL: https://github.com/apache/kafka/pull/9039#issuecomment-679394512 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r475919413 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.test.TestRecord; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessor; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class KStreamSlidingWindowAggregateTest { +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private final String threadId = Thread.currentThread().getName(); + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInput() { +final StreamsBuilder builder = new StreamsBuilder(); +final String topic = "topic"; + +final KTable, String> table = builder +.stream(topic, Consumed.with(Serdes.String(), Serdes.String())) +.groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(10), ofMillis(50))) +.aggregate( +MockInitializer.STRING_INIT, +MockAggregator.TOSTRING_ADDER, +Materialized.>as("topic-Canonized").withValueSerde(Serdes.String()) +); +final MockProcessorSupplier, String> supplier = new MockProcessorSupplier<>(); +table.toStream().process(supplier); +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +final TestInputTopic inputTopic = +driver.createInputTopic(topic, new StringSerializer(), new StringSerializer()); +inputTopic.pipeInput("A", "1", 10L); +inputTopic.pipeInput("A", "2", 15L); +inputTopic.pipeInput("A", "3", 20L); +inputTopic.pipeInpu
[jira] [Updated] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10133: --- Fix Version/s: 2.7.0 > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > Fix For: 2.7.0 > > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ning2008wisc opened a new pull request #9215: KAFKA-10133: MM2 readme update on config
ning2008wisc opened a new pull request #9215: URL: https://github.com/apache/kafka/pull/9215 Per https://issues.apache.org/jira/browse/KAFKA-10133, MM2 users sometimes confuse on specifying or overriding the default configurations at different levels: connector, MM2, producer/consumers. It may be great if we clarify how to correctly set those configs in README.md, in addition to the example config file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing
ableegoldman commented on pull request #9174: URL: https://github.com/apache/kafka/pull/9174#issuecomment-679373369 @mjsax Be my guest 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing
mjsax commented on pull request #9174: URL: https://github.com/apache/kafka/pull/9174#issuecomment-679347362 Might it be worth to cherry-pick to `2.6` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9214: [DO NOT MERGE] POC: unify all jvm cache pools
vvcephei opened a new pull request #9214: URL: https://github.com/apache/kafka/pull/9214 Currently, users of Suppress in strict mode must either not configure a memory bound or consider a per-operator, per-partition bound. The former would result in the application crashing ungracefully if it needs too much memory, which is sub-optimal for some deployment strategies. The latter is nice for determinism, but is difficult to configure in practice. In addition to suppress buffers, Streams has a record cache configuration. Currently, we make the assumption that all threads would probably use a uniform amount of cache space, but this assumption is clearly wrong in some cases. Finally, there are some applications that want to run multiple Streams instances in the same JVM, probably for running different Streams topologies. In aggregate, there are quite a few "pools" of heap space that users need to configure if they want to avoid an OOME, and the more threads, applications, and Suppress operators there are, the more granular these pools become. Of course, the more granular the pools are, the lower utilization of the available memory we will see. Plus, especially for Suppression, very granular pool configuration means a higher likelihood that the operator will run out of space and shut the app down. This POC demonstrates the feasibility of unifying all these pools with one logical bound on total memory usage for all caches and suppression buffers, across all operators/tasks and all threads, and even across all Streams instances in the JVM. Most of the tests pass right now, but not all of them. I also need to clean up a few more things before really starting a discussion. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata
hachikuji merged pull request #9112: URL: https://github.com/apache/kafka/pull/9112 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r475800482 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,29 +60,46 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join +// we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join +// and just ignore the record. // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored -if (key == null || value == null) { +final Optional maybeMappedKey = maybeExtractMappedKey(key, value); +if (!maybeMappedKey.isPresent()) { LOG.warn( "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); droppedRecordsSensor.record(); } else { -final K2 mappedKey = keyMapper.apply(key, value); -final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey)); +final K2 mappedKey = maybeMappedKey.get(); +final V2 value2 = getValueOrNull(valueGetter.get(mappedKey)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2)); } } } +private Optional maybeExtractMappedKey(final K1 key, final V1 value) { +if (value == null) { +return Optional.empty(); +} + +// we allow the case where the key is null but mappedKey is not null and thus +// we need to guard against nullPointerExceptions. This may happen for GlobalKTables. +// For KTables, the keyMapper simply returns the key, so this will never happen +Optional maybeMappedKey; +try { +maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value)); +} catch (final NullPointerException e) { Review comment: Yeah the compatibility argument is reasonable, but you could say that users should have been handling the null case all along and it was just due to a bug in Streams that we never actually passed in a null key. If people _aren't_ handling null, we should try and alert them quickly (and nothing catches people's attention faster than an NPE) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7740) Kafka Admin Client should be able to manage user/client configurations for users and clients
[ https://issues.apache.org/jira/browse/KAFKA-7740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183512#comment-17183512 ] Travis Bischel commented on KAFKA-7740: --- Oh cool, that's great. I thought that aspect was completely removed, but I didn't know there was a different blocker. Sorry for the ping! > Kafka Admin Client should be able to manage user/client configurations for > users and clients > > > Key: KAFKA-7740 > URL: https://issues.apache.org/jira/browse/KAFKA-7740 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 > Environment: linux >Reporter: Yaodong Yang >Assignee: Brian Byrne >Priority: Major > Labels: features > Fix For: 2.6.0 > > > Right now, Kafka Admin Client only allow users to change the configuration of > brokers and topics. There are some use cases that users want to setOrUpdate > quota configurations for users and clients through Kafka Admin Client. > Without this new capability, users have to manually talk to zookeeper for > this, which will pose other challenges for customers. > Considering we have already have the framework for the much complex brokers > and topic configuration changes, it seems straightforward to add the support > for the alterConfig and describeConfig for users and clients as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r475792745 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,29 +60,46 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join +// we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join +// and just ignore the record. // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored -if (key == null || value == null) { +final Optional maybeMappedKey = maybeExtractMappedKey(key, value); +if (!maybeMappedKey.isPresent()) { LOG.warn( "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); droppedRecordsSensor.record(); } else { -final K2 mappedKey = keyMapper.apply(key, value); -final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey)); +final K2 mappedKey = maybeMappedKey.get(); +final V2 value2 = getValueOrNull(valueGetter.get(mappedKey)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2)); } } } +private Optional maybeExtractMappedKey(final K1 key, final V1 value) { +if (value == null) { +return Optional.empty(); Review comment: This seems a little subtle. Can we just return the actual mapped key (or `.empty()`) in this method, and keep the explicit null check for `value` up above? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
guozhangwang commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r475792671 ## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ## @@ -123,7 +123,7 @@ public void shouldAllowJoinMaterializedFilteredKTable() { assertThat( topology.stateStores().size(), -equalTo(1)); +equalTo(2)); Review comment: `enableSendingOldValues` is indicated for sending a pair of values, and if it is not set the `old` value would always be null. If we want to `enableSendingOldValues` then we'd have to materialize the source node still? Maybe I'm not fully understanding the context here. Could you bring me up to date? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9186: KAFKA-10277: Allow null keys with non-null mappedKey in KStreamKGlobalTable join
ableegoldman commented on a change in pull request #9186: URL: https://github.com/apache/kafka/pull/9186#discussion_r475791126 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java ## @@ -58,29 +60,46 @@ public void init(final ProcessorContext context) { @Override public void process(final K1 key, final V1 value) { -// we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record -// If {@code keyMapper} returns {@code null} it implies there is no match, -// so ignore unless it is a left join +// we do join iff the joining keys are equal, thus, if the mappedKey is null we cannot join +// and just ignore the record. // // we also ignore the record if value is null, because in a key-value data model a null-value indicates // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored -if (key == null || value == null) { +final Optional maybeMappedKey = maybeExtractMappedKey(key, value); +if (!maybeMappedKey.isPresent()) { LOG.warn( "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, context().topic(), context().partition(), context().offset() ); droppedRecordsSensor.record(); } else { -final K2 mappedKey = keyMapper.apply(key, value); -final V2 value2 = mappedKey == null ? null : getValueOrNull(valueGetter.get(mappedKey)); Review comment: Oh yeah, duh. Nevermind this 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators
[ https://issues.apache.org/jira/browse/KAFKA-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10409: - Labels: newbie (was: ) > Refactor Kafka Streams RocksDb iterators > - > > Key: KAFKA-10409 > URL: https://issues.apache.org/jira/browse/KAFKA-10409 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Priority: Minor > Labels: newbie > > From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] : > [~ableegoldman] : > > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to > > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these > > iterators could be cleaned up a bit in general to be more understandable – > > for example, it's weird that we do the {{iterator#seek}}-ing in the actual > > {{all()}} method but for range queries we do the seeking inside the > > iterator constructor. > and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] : > > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to > > follow even before the reverse iteration, so it would be nice to have some > > tests specifically covering reverse iterators over multi-column-family > > timestamped stores -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10417: - Fix Version/s: 3.0.0 2.8.0 > suppress() with cogroup() throws ClassCastException > --- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Wardha Perinkada Kattu >Priority: Blocker > Labels: kafka-streams > Fix For: 3.0.0, 2.8.0 > > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( > .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown is: > {code:java} > Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to > class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier > (org.apache.kafka.streams.kstream.internals.PassThrough and > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in > unnamed module of loader 'app') > {code} > [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10427) Implement FetchSnapshot RPC
Jose Armando Garcia Sancio created KAFKA-10427: -- Summary: Implement FetchSnapshot RPC Key: KAFKA-10427 URL: https://issues.apache.org/jira/browse/KAFKA-10427 Project: Kafka Issue Type: Sub-task Components: core, replication Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-679258872 > I should just delete the test for DefaultMessageFormatter in ConsoleConsumerTest.scala. @badaiaqrandista I'm not sure. My vote would be to keep the test but move it over to the `DefaultMessageFormatterTest.scala` class. But I'm not familiar enough with this code to say for sure. From a quick look at the old test, it's not clear to me why it failed. I guess the `Partition number` gets printed by default now? \cc @dajac This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475771230 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-679256967 @chia7712 : Sorry for the late response. I just realized there seems to be another issue in addition to the above one that I mentioned. The second issue is that we hold a group lock while calling `joinPurgatory.tryCompleteElseWatch`. In this call, it's possible that DelayedJoin.onComplete() will be called. In that case, since the caller holds the group lock, we won't be completing partitionsToComplete in completeDelayedRequests(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9200: MINOR: mirror integration tests should not call System.exit
rajinisivaram commented on pull request #9200: URL: https://github.com/apache/kafka/pull/9200#issuecomment-679249066 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reopened KAFKA-10133: There is no bug in the code, but need some efforts on doc to clarify on where some frequently used configs should be set. > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang reassigned KAFKA-10133: -- Assignee: Ning Zhang > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Assignee: Ning Zhang >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475756645 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via Review comment: We've gone back and forth on this. The KIP does not explicitly state what to do in the case of a describe request for a user that does not have credentials, and we originally coded it to silently drop them, but then we changed it to be consistent with other APIs and raise an error (https://github.com/apache/kafka/pull/9032#discussion_r468871453). I agree that it isn't totally clear what to do. Rather than making the change back, I'll leave both this Javadoc and the underlying implementation as-is right now unill we discuss further and decide for sure what we want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183445#comment-17183445 ] Steve Jacobs commented on KAFKA-10133: -- That would be incredibly helpful. Just an example of where this setting goes would save a lot of trouble! > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lbradstreet opened a new pull request #9213: MINOR: add epoch lineage checks to system tests
lbradstreet opened a new pull request #9213: URL: https://github.com/apache/kafka/pull/9213 This adds assertions to check that leader epoch lineages match between replicas. These have been added to system tests that involve broker restarts and that wait for replicas to rejoin the ISR by the end of the test. I also moved wait_until_rejoin_isr from downgrade_test and upgrade_test as the implementation was the same in each. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475750058 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] chia7712 opened a new pull request #9212: MINOR: don't keep reference of receive buffer when the value of head …
chia7712 opened a new pull request #9212: URL: https://github.com/apache/kafka/pull/9212 Users, who try to avoid null variable, can replace null by empty array to build header to be a kind of "flag". The ```RecordHeader#value()``` may be never called (as users only want to check existence of header) and so the receive buffer can't be released by GC until users get rid of ```Header```. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Goltseva Taisiia updated KAFKA-10426: - Reviewer: Konstantine Karantasis (was: Chris Egerton) > Deadlock in KafkaConfigBackingStore > --- > > Key: KAFKA-10426 > URL: https://issues.apache.org/jira/browse/KAFKA-10426 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1, 2.6.0 >Reporter: Goltseva Taisiia >Assignee: Goltseva Taisiia >Priority: Critical > Labels: pull-request-available > > Hi, guys! > We faced the following deadlock: > > {code:java} > KafkaBasedLog Work Thread - _streaming_service_config > priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId > (decimal):2384 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) > - waiting to lock <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) > - locked <0xd8c3be40> (a java.lang.Object) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) > CustomDistributedHerder-connect-1 > priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId > (decimal):2362 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) > - waiting to lock <0xd8c3be40> (a java.lang.Object) > at > com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) > - locked <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) > at > com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > DistributedHerder went to updateConfigsWithIncrementalCooperative() > synchronized method and called configBackingStore.snapshot() which take a > lock on internal object in KafkaConfigBackingStore class. > > Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized > block on internal object got SESSION_KEY record and called > updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. > > As I can see the problem is here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] > > As I understand this call should be performed outside synchronized block: > {code:java} > if (started) > > updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} > > I'm going to make a PR. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Goltseva Taisiia updated KAFKA-10426: - Reviewer: Chris Egerton (was: Konstantine Karantasis) > Deadlock in KafkaConfigBackingStore > --- > > Key: KAFKA-10426 > URL: https://issues.apache.org/jira/browse/KAFKA-10426 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1, 2.6.0 >Reporter: Goltseva Taisiia >Assignee: Goltseva Taisiia >Priority: Critical > Labels: pull-request-available > > Hi, guys! > We faced the following deadlock: > > {code:java} > KafkaBasedLog Work Thread - _streaming_service_config > priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId > (decimal):2384 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) > - waiting to lock <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) > - locked <0xd8c3be40> (a java.lang.Object) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) > CustomDistributedHerder-connect-1 > priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId > (decimal):2362 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) > - waiting to lock <0xd8c3be40> (a java.lang.Object) > at > com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) > - locked <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) > at > com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > DistributedHerder went to updateConfigsWithIncrementalCooperative() > synchronized method and called configBackingStore.snapshot() which take a > lock on internal object in KafkaConfigBackingStore class. > > Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized > block on internal object got SESSION_KEY record and called > updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. > > As I can see the problem is here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] > > As I understand this call should be performed outside synchronized block: > {code:java} > if (started) > > updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} > > I'm going to make a PR. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
cmccabe commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r475725389 ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -316,7 +316,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, threadNamePrefix) kafkaController.startup() -brokerToControllerChannelManager = new BrokerToControllerChannelManager(metadataCache, time, metrics, config, threadNamePrefix) +if (config.redirectionEnabled) { + brokerToControllerChannelManager = new BrokerToControllerChannelManager(metadataCache, time, metrics, config, threadNamePrefix) Review comment: just as a note the alter isr PR may also have an object like this. so maybe we want a name which is more specific to redirection. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183401#comment-17183401 ] Ning Zhang commented on KAFKA-10133: Great. Then I will probably make a pr to clarify some use cases like this. Thanks > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
cmccabe commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r475714230 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java ## @@ -87,6 +87,16 @@ public Builder(Map configs, boolean validateOnly) { public AlterConfigsRequest build(short version) { return new AlterConfigsRequest(data, version); } + Review comment: In general we don't define equals or hashCode on these builders. Why are we defining it here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
cmccabe commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r475712573 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -325,7 +326,9 @@ UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new), THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new), PRODUCER_FENCED(90, "There is a newer producer with the same transactionalId " + -"which fences the current one.", ProducerFencedException::new); +"which fences the current one.", ProducerFencedException::new), +BROKER_AUTHORIZATION_FAILURE(91, "Authorization failed for the request during forwarding. " + Review comment: How about: "A broker failed to authorize itself to another component of the system. This indicates an internal error on the broker cluster security setup". This isn't specific to forwarding... there might be other reasons why a broker would need to authorize itself and fail This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics
cmccabe commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r475710400 ## File path: clients/src/main/java/org/apache/kafka/common/errors/BrokerAuthorizationFailureException.java ## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Exception used to indicate a broker side authorization failure during request redirection. + */ +public class BrokerAuthorizationFailureException extends AuthorizationException { + Review comment: Need to include: ``` private static final long serialVersionUID = 1L; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on pull request #9032: URL: https://github.com/apache/kafka/pull/9032#issuecomment-679199653 @rajinisivaram : I'd like to understand your suggestion to forbid authenticating via delegation token here. It doesn't seem consistent with how we handle delegation tokens in general, so I might be missing something. It seems like a lot of administrative systems may use delegation tokens and this would make this API not useful for them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475698813 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via Review comment: I think it would be better to make this "a future indicating the users that were listed" (rather than "requested"). It's maybe a bit of a subtle distinction but think about things like requesting the null user, or the empty string user. It's awkward to put that here. I think if we explicitly request a user but it doesn't exist, it should be omitted from here as well. That gives us more flexibility in the future with the API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475696554 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r475695750 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ResourceNotFoundException; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> usersFuture; +private final Map> perUserFutures; + +/** + * + * @param usersFuture the future indicating the users described by the call + * @param perUserFutures the required map of user names to futures representing the results of describing each + * user's SCRAM credentials. + */ +public DescribeUserScramCredentialsResult(KafkaFuture> usersFuture, + Map> perUserFutures) { +this.usersFuture = Objects.requireNonNull(usersFuture); +this.perUserFutures = Objects.requireNonNull(perUserFutures); +} + +/** + * + * @return a future for the results of all requested (either explicitly or implicitly via describe-all) users. + * The future will complete successfully only if the users future first completes successfully and then all the + * futures for the user descriptions complete successfully. + */ +public KafkaFuture> all() { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(users()); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +KafkaFuture succeedsOnlyIfAllDescriptionsSucceed = KafkaFuture.allOf(perUserFutures.values().toArray( +new KafkaFuture[perUserFutures.size()])); +KafkaFuture> mapFuture = succeedsOnlyIfAllDescriptionsSucceed.thenApply(void2 -> +perUserFutures.entrySet().stream().collect(Collectors.toMap( +e -> e.getKey(), +e -> valueFromFutureGuaranteedToSucceedAtThisPoint(e.getValue(); +/* At this point it is only the users future that is guaranteed to have succeeded. + * We want to return the future to the map, but we have to return a map at this point. + * We need to dereference the future while propagating any exception. + */ +return valueFromFuturePropagatingExceptionsAsUnchecked(mapFuture); +}); +} + +/** + * + * @return a future indicating the distinct users that were requested (either explicitly or implicitly via + * describe-all). The future will not complete successfully if the user is not authorized to perform the describe + * operation; otherwise, it will complete successfully as long as the list of users with credentials can be + * successfully determined within some hard-coded timeout period. + */ +public KafkaFuture> users() { +return usersFuture; +} + +/** + * + * @param userName the name of the user description being requested + * @return a future indicating the description results for the given user. The future will complete exceptionally if + * the future returned by {@link #users()} completes exceptionally. If the given user does not exist in the list + * of requested users then the future will complete exceptionally with + * {@link org.apache.kafka.common.errors.ResourceNotFoundException}. + */ +public KafkaFuture description(String userName) { +KafkaFuture succeedsOnlyIfUsersFutureSucceeds = KafkaFuture.allOf(usersFuture); +return succeedsOnlyIfUsersFutureSucceeds.thenApply(void1 -> { +
[GitHub] [kafka] cmccabe commented on pull request #9194: KAFKA-10384: Separate converters from generated messages
cmccabe commented on pull request #9194: URL: https://github.com/apache/kafka/pull/9194#issuecomment-679187796 > Ok, just to make sure I understand [...] Now for a given schema, we will generate SomeMessageData as well as SomeMessageJsonConverter Right, you got it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7740) Kafka Admin Client should be able to manage user/client configurations for users and clients
[ https://issues.apache.org/jira/browse/KAFKA-7740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183338#comment-17183338 ] Brian Byrne commented on KAFKA-7740: Hello - resolve is still planned for a future release, however it got hung up around an interaction with the `ClientQuotaCallback`. A KIP to address that issue will be required. I can update the original KIP to indicate the resolve functionality is not yet available. > Kafka Admin Client should be able to manage user/client configurations for > users and clients > > > Key: KAFKA-7740 > URL: https://issues.apache.org/jira/browse/KAFKA-7740 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0 > Environment: linux >Reporter: Yaodong Yang >Assignee: Brian Byrne >Priority: Major > Labels: features > Fix For: 2.6.0 > > > Right now, Kafka Admin Client only allow users to change the configuration of > brokers and topics. There are some use cases that users want to setOrUpdate > quota configurations for users and clients through Kafka Admin Client. > Without this new capability, users have to manually talk to zookeeper for > this, which will pose other challenges for customers. > Considering we have already have the framework for the much complex brokers > and topic configuration changes, it seems straightforward to add the support > for the alterConfig and describeConfig for users and clients as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
[ https://issues.apache.org/jira/browse/KAFKA-10426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Goltseva Taisiia reassigned KAFKA-10426: Assignee: Goltseva Taisiia > Deadlock in KafkaConfigBackingStore > --- > > Key: KAFKA-10426 > URL: https://issues.apache.org/jira/browse/KAFKA-10426 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1, 2.6.0 >Reporter: Goltseva Taisiia >Assignee: Goltseva Taisiia >Priority: Critical > > Hi, guys! > We faced the following deadlock: > > {code:java} > KafkaBasedLog Work Thread - _streaming_service_config > priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId > (decimal):2384 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) > - waiting to lock <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) > - locked <0xd8c3be40> (a java.lang.Object) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) > at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) > at > org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) > at > org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) > CustomDistributedHerder-connect-1 > priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId > (decimal):2362 - state:BLOCKED > stackTrace: > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) > - waiting to lock <0xd8c3be40> (a java.lang.Object) > at > com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) > - locked <0xe6136808> (a > com.company.streaming.platform.kafka.DistributedHerder) > at > com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) > at > com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){code} > DistributedHerder went to updateConfigsWithIncrementalCooperative() > synchronized method and called configBackingStore.snapshot() which take a > lock on internal object in KafkaConfigBackingStore class. > > Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized > block on internal object got SESSION_KEY record and called > updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. > > As I can see the problem is here: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] > > As I understand this call should be performed outside synchronized block: > {code:java} > if (started) > > updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} > > I'm going to make a PR. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
[ https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183304#comment-17183304 ] Flavien Raynaud commented on KAFKA-8733: Has there been any update regarding this issue/the associated KIP? I can see that the thread on the mailing list has been empty for the past 6 months. It has happened again recently when one broker ecountered a disk failure, causing a bunch of offline partitions. Happy to help in any way we can 😄 > Offline partitions occur when leader's disk is slow in reads while responding > to follower fetch requests. > - > > Key: KAFKA-8733 > URL: https://issues.apache.org/jira/browse/KAFKA-8733 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.2, 2.4.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Critical > Attachments: weighted-io-time-2.png, wio-time.png > > > We found offline partitions issue multiple times on some of the hosts in our > clusters. After going through the broker logs and hosts’s disk stats, it > looks like this issue occurs whenever the read/write operations take more > time on that disk. In a particular case where read time is more than the > replica.lag.time.max.ms, follower replicas will be out of sync as their > earlier fetch requests are stuck while reading the local log and their fetch > status is not yet updated as mentioned in the below code of `ReplicaManager`. > If there is an issue in reading the data from the log for a duration more > than replica.lag.time.max.ms then all the replicas will be out of sync and > partition becomes offline if min.isr.replicas > 1 and unclean.leader.election > is false. > > {code:java} > def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { > val result = readFromLocalLog( // this call took more than > `replica.lag.time.max.ms` > replicaId = replicaId, > fetchOnlyFromLeader = fetchOnlyFromLeader, > readOnlyCommitted = fetchOnlyCommitted, > fetchMaxBytes = fetchMaxBytes, > hardMaxBytesLimit = hardMaxBytesLimit, > readPartitionInfo = fetchInfos, > quota = quota, > isolationLevel = isolationLevel) > if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // > fetch time gets updated here, but mayBeShrinkIsr should have been already > called and the replica is removed from isr > else result > } > val logReadResults = readFromLog() > {code} > Attached the graphs of disk weighted io time stats when this issue occurred. > I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how > to handle this scenario. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475626573 ## File path: config/log4j.properties ## @@ -61,8 +61,8 @@ log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.logger.org.apache.zookeeper=INFO # Change the two lines below to adjust the general broker logging level (output to server.log and stdout) -log4j.logger.kafka=INFO -log4j.logger.org.apache.kafka=INFO +log4j.logger.kafka=DEBUG Review comment: Need to revert this stuff, didn't mean to commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi opened a new pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi opened a new pull request #9211: URL: https://github.com/apache/kafka/pull/9211 DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class. Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. So, we have a Deadlock. To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475632719 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1257,4 +1364,4 @@ class Partition(val topicPartition: TopicPartition, } partitionString.toString } -} +} Review comment: newline This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475625111 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -325,7 +326,8 @@ UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new), THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new), PRODUCER_FENCED(90, "There is a newer producer with the same transactionalId " + -"which fences the current one.", ProducerFencedException::new); +"which fences the current one.", ProducerFencedException::new), +INVALID_UPDATE_VERSION(91, "The given ISR version was out-of-date.", InvalidUpdateVersionException::new); Review comment: This error message should be less specific This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475630695 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -806,8 +840,9 @@ class Partition(val topicPartition: TopicPartition, // avoid unnecessary collection generation var newHighWatermark = leaderLog.logEndOffsetMetadata remoteReplicasMap.values.foreach { replica => +// Note here we are using effectiveInSyncReplicaIds, see explanation above Review comment: Fix comment to refer to correct variable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475629593 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -635,7 +666,7 @@ class Partition(val topicPartition: TopicPartition, // check if we need to expand ISR to include this replica // if it is not in the ISR yet Review comment: Expand on this comment to discuss the maximal ISR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications
mumrah commented on a change in pull request #9100: URL: https://github.com/apache/kafka/pull/9100#discussion_r475627290 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -141,7 +142,8 @@ object Partition extends KafkaMetricsGroup { stateStore = zkIsrBackingStore, delayedOperations = delayedOperations, metadataCache = replicaManager.metadataCache, - logManager = replicaManager.logManager) + logManager = replicaManager.logManager, + alterIsrChannelManager = replicaManager.alterIsrManager) Review comment: Rename to alterIsrManager This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10133) Cannot compress messages in destination cluster with MM2
[ https://issues.apache.org/jira/browse/KAFKA-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183300#comment-17183300 ] Steve Jacobs commented on KAFKA-10133: -- That is how I was confirming compression as well. The example you gave is the same as what I said above. It sets the property on the kafka connect worker. Not in the sourceconnector/checkpointconnector etc. > Cannot compress messages in destination cluster with MM2 > > > Key: KAFKA-10133 > URL: https://issues.apache.org/jira/browse/KAFKA-10133 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 > Environment: kafka 2.5.0 deployed via the strimzi operator 0.18 >Reporter: Steve Jacobs >Priority: Minor > > When configuring mirrormaker2 using kafka connect, it is not possible to > configure things such that messages are compressed in the destination > cluster. Dump Log shows that batching is occuring, but no compression. If > this is possible, then this is a documentation bug, because I can find no > documentation on how to do this. > baseOffset: 4208 lastOffset: 4492 count: 285 baseSequence: -1 lastSequence: > -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: > false isControl: false position: 239371 CreateTime: 1591745894859 size: 16362 > magic: 2 compresscodec: NONE crc: 1811507259 isvalid: true > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
[ https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-10414. -- Resolution: Not A Problem api-util is only a test dependency, not an issue. > Upgrade api-util dependency - CVE-2018-1337 > --- > > Key: KAFKA-10414 > URL: https://issues.apache.org/jira/browse/KAFKA-10414 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > There is a dependency on org.apache.directory.api:api-util:1.0.0, which is > involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= > This is a transitive dependency through the apacheds libs. > -Can be fixed by upgrading to at least version 2.0.0.AM25- > Since api-all is also a dependency, and there is a class collision between > api-all and newer version of api-util, it is better to just upgrade api-util > to 1.0.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted
[ https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183104#comment-17183104 ] Ilia Pasynkov commented on KAFKA-10362: --- [~high.lee] Hello, yes I'm working on this task) > When resuming Streams active task with EOS, the checkpoint file should be > deleted > - > > Key: KAFKA-10362 > URL: https://issues.apache.org/jira/browse/KAFKA-10362 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > Today when we suspend a task we commit and along with the commit we always > write checkpoint file even if we are eosEnabled (since the state is already > SUSPENDED). But the suspended task may later be resumed and in that case the > checkpoint file should be deleted since it should only be written when it is > cleanly closed. > With our latest rebalance protocol in KIP-429, resume would not be called > since all suspended tasks would be closed, but with the old eager protocol it > may still be called — I think that may be the reason we did not get it often. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted
[ https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183075#comment-17183075 ] highluck commented on KAFKA-10362: -- [~ipasynkov] Are you working on it? If not, can I do PR? > When resuming Streams active task with EOS, the checkpoint file should be > deleted > - > > Key: KAFKA-10362 > URL: https://issues.apache.org/jira/browse/KAFKA-10362 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > Today when we suspend a task we commit and along with the commit we always > write checkpoint file even if we are eosEnabled (since the state is already > SUSPENDED). But the suspended task may later be resumed and in that case the > checkpoint file should be deleted since it should only be written when it is > cleanly closed. > With our latest rebalance protocol in KIP-429, resume would not be called > since all suspended tasks would be closed, but with the old eager protocol it > may still be called — I think that may be the reason we did not get it often. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10426) Deadlock in KafkaConfigBackingStore
Goltseva Taisiia created KAFKA-10426: Summary: Deadlock in KafkaConfigBackingStore Key: KAFKA-10426 URL: https://issues.apache.org/jira/browse/KAFKA-10426 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.6.0, 2.4.1 Reporter: Goltseva Taisiia Hi, guys! We faced the following deadlock: {code:java} KafkaBasedLog Work Thread - _streaming_service_config priority:5 - threadId:0x7f18ec22c000 - nativeId:0x950 - nativeId (decimal):2384 - state:BLOCKED stackTrace: java.lang.Thread.State: BLOCKED (on object monitor) at com.company.streaming.platform.kafka.DistributedHerder$ConfigUpdateListener.onSessionKeyUpdate(DistributedHerder.java:1586) - waiting to lock <0xe6136808> (a com.company.streaming.platform.kafka.DistributedHerder) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:707) - locked <0xd8c3be40> (a java.lang.Object) at org.apache.kafka.connect.storage.KafkaConfigBackingStore$ConsumeCallback.onCompletion(KafkaConfigBackingStore.java:481) at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:264) at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:71) at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:337) CustomDistributedHerder-connect-1 priority:5 - threadId:0x7f1a01e30800 - nativeId:0x93a - nativeId (decimal):2362 - state:BLOCKED stackTrace: java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.snapshot(KafkaConfigBackingStore.java:285) - waiting to lock <0xd8c3be40> (a java.lang.Object) at com.company.streaming.platform.kafka.DistributedHerder.updateConfigsWithIncrementalCooperative(DistributedHerder.java:514) - locked <0xe6136808> (a com.company.streaming.platform.kafka.DistributedHerder) at com.company.streaming.platform.kafka.DistributedHerder.tick(DistributedHerder.java:402) at com.company.streaming.platform.kafka.DistributedHerder.run(DistributedHerder.java:286) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){code} DistributedHerder went to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class. Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object got SESSION_KEY record and called updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. As I can see the problem is here: [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L737] As I understand this call should be performed outside synchronized block: {code:java} if (started) updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);{code} I'm going to make a PR. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10402) Upgrade python version in system tests
[ https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183038#comment-17183038 ] Nikolay Izhikov commented on KAFKA-10402: - Fails: {noformat} test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery.processing_guarantee=exactly_once_beta status: FAIL run time: 9 minutes 59.410 seconds Never saw output 'StateChange: REBALANCING -> RUNNING' on ducker@ducker09 Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 91, in test_failure_and_recovery StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 110, in run_failure_and_recovery self.add_streams(processor1) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 133, in add_streams self.wait_for_startup(monitor, processor) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 168, in wait_for_startup self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 174, in wait_for err_msg=("Never saw output '%s' on " % output) + str(processor.node.account)) File "/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", line 708, in wait_until allow_fail=True) == 0, **kwargs) File "/usr/local/lib/python3.7/dist-packages/ducktape/utils/util.py", line 41, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) ducktape.errors.TimeoutError: Never saw output 'StateChange: REBALANCING -> RUNNING' on ducker@ducker09 test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_failure_and_recovery_complex.processing_guarantee=exactly_once status: FAIL run time: 10 minutes 15.336 seconds Never saw output 'StateChange: REBALANCING -> RUNNING' on ducker@ducker05 Traceback (most recent call last): File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/usr/local/lib/python3.7/dist-packages/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.7/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 100, in test_failure_and_recovery_complex StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 110, in run_failure_and_recovery self.add_streams(processor1) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 133, in add_streams self.wait_for_startup(monitor, processor) File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 168, in wait_for_startup self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_eos_test.py", line 174, in wait_for err_msg=("Never saw output '%s' on " % output) + str(processor.node.account)) File "/usr/local/lib/python3.7/dist-packages/ducktape/cluster/remoteaccount.py", line 708, in wait_until allow_fail=True) == 0, **kwargs) File "/usr/local/lib/python3.7/dist-packages/ducktape/utils/util.py", line 41, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) ducktape.errors.TimeoutError: Never saw output 'StateChange: REBALANCING -> RUNNING' on ducker@ducker05 test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=compatible status: FAIL run time: 6 minutes 8.566 seconds Found validation errors: Not enough messages were processed: source:0 sink:0 Not eno
[jira] [Comment Edited] (KAFKA-10402) Upgrade python version in system tests
[ https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183036#comment-17183036 ] Nikolay Izhikov edited comment on KAFKA-10402 at 8/24/20, 8:05 AM: --- Tests results: Full report in [^report.txt] {noformat} SESSION REPORT (ALL TESTS) ducktape version: 0.8.0 session_id: 2020-08-23--002 run time: 1010 minutes 46.483 seconds tests run:684 passed: 505 failed: 9 ignored: 170 {noformat} was (Author: nizhikov): Tests results: {noformat} SESSION REPORT (ALL TESTS) ducktape version: 0.8.0 session_id: 2020-08-23--002 run time: 1010 minutes 46.483 seconds tests run:684 passed: 505 failed: 9 ignored: 170 {noformat} > Upgrade python version in system tests > -- > > Key: KAFKA-10402 > URL: https://issues.apache.org/jira/browse/KAFKA-10402 > Project: Kafka > Issue Type: Improvement >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Major > Attachments: report.txt > > > Currently, system tests using python 2 which is outdated and not supported. > Since all dependency of system tests including ducktape supporting python 3 > we can migrate system tests to python3. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10402) Upgrade python version in system tests
[ https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183036#comment-17183036 ] Nikolay Izhikov commented on KAFKA-10402: - Tests results: {noformat} SESSION REPORT (ALL TESTS) ducktape version: 0.8.0 session_id: 2020-08-23--002 run time: 1010 minutes 46.483 seconds tests run:684 passed: 505 failed: 9 ignored: 170 {noformat} > Upgrade python version in system tests > -- > > Key: KAFKA-10402 > URL: https://issues.apache.org/jira/browse/KAFKA-10402 > Project: Kafka > Issue Type: Improvement >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Major > Attachments: report.txt > > > Currently, system tests using python 2 which is outdated and not supported. > Since all dependency of system tests including ducktape supporting python 3 > we can migrate system tests to python3. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10402) Upgrade python version in system tests
[ https://issues.apache.org/jira/browse/KAFKA-10402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nikolay Izhikov updated KAFKA-10402: Attachment: report.txt > Upgrade python version in system tests > -- > > Key: KAFKA-10402 > URL: https://issues.apache.org/jira/browse/KAFKA-10402 > Project: Kafka > Issue Type: Improvement >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Major > Attachments: report.txt > > > Currently, system tests using python 2 which is outdated and not supported. > Since all dependency of system tests including ducktape supporting python 3 > we can migrate system tests to python3. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10425) Documentation switches to a random page on clicking the left navigation bar hide/expand button
[ https://issues.apache.org/jira/browse/KAFKA-10425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17183034#comment-17183034 ] Luke Chen commented on KAFKA-10425: --- I'm having the same issue, too. > Documentation switches to a random page on clicking the left navigation bar > hide/expand button > -- > > Key: KAFKA-10425 > URL: https://issues.apache.org/jira/browse/KAFKA-10425 > Project: Kafka > Issue Type: Improvement > Components: docs >Affects Versions: 2.6.0, 2.5.1 >Reporter: Sanjay Ravikumar >Priority: Minor > Labels: documentation > > The Kafka documentation includes a button to hide or expand left navigation > bar. On clicking that button, the documentation switches to a random page. > For example, while I'm on > [https://kafka.apache.org/documentation.html#hwandos|https://kafka.apache.org/25/documentation.html#hwandos], > if I click that hide button, the page switches to some page further down in > the documentation. Similarly, if I'm on a certain page with left navigation > bar hidden, and when I click that button, the page switches to a page > further up in the documentation. > This might be happening due to page resizing when that button is clicked. The > issue is present in both 2.6.0 and 2.5.1 versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I cannot create `IOException` with this case. I added comments for this line to explain the reason. Thank you. ref: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I cannot create `IOException` case with this case. I added comments for this line to explain the reason. Thank you. ref: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon edited a comment on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-678959902 @mjsax , I've updated in this commit: https://github.com/apache/kafka/pull/9121/commits/12d3826a87a5b21033e2f81c6a486353e79d8591. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-678959902 @mjsax , I've updated in this commit: https://github.com/apache/kafka/pull/9121/commits/b248ccc4f35ce6bb7ef865c65392710c1558ca20. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I added comments for this line to explain the reason. Thank you. ref: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I added comments for this line to explain the reason. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org