[GitHub] [kafka] omkreddy commented on pull request #9200: MINOR: mirror integration tests should not call System.exit
omkreddy commented on pull request #9200: URL: https://github.com/apache/kafka/pull/9200#issuecomment-675864751 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit
lbradstreet commented on a change in pull request #9200: URL: https://github.com/apache/kafka/pull/9200#discussion_r472723313 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -185,10 +192,9 @@ public void close() { deleteAllTopics(backup.kafka()); primary.stop(); backup.stop(); +assertFalse(exited.get()); Review comment: Ah, fancy GitHub stuff. I pushed up the fix independently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit
lbradstreet commented on a change in pull request #9200: URL: https://github.com/apache/kafka/pull/9200#discussion_r472723414 ## File path: core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala ## @@ -29,14 +29,31 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.apache.kafka.common.utils.Exit +import org.junit.After import org.junit.Test import org.junit.Assert._ +import org.junit.Before class MirrorMakerIntegrationTest extends KafkaServerTestHarness { override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties())) + val exited = new AtomicBoolean(false) + + @Before + override def setUp(): Unit = { +Exit.setExitProcedure((_, _) => exited.set(true)) +super.setUp() + } + + @After + override def tearDown(): Unit = { +super.tearDown() +assertFalse(exited.get()) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON
cmccabe commented on pull request #9197: URL: https://github.com/apache/kafka/pull/9197#issuecomment-675859396 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-675858452 @kkonstantine , could you help review this PR to improve logging? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability
[ https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180258#comment-17180258 ] Ismael Juma commented on KAFKA-8135: [~rana.deb23] I suggest filing a new issue and please provide broker/client versions as well as a stacktrace from the producer when it's stuck. > Kafka Producer deadlocked on flush call with intermittent broker > unavailability > --- > > Key: KAFKA-8135 > URL: https://issues.apache.org/jira/browse/KAFKA-8135 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 >Reporter: Guozhang Wang >Assignee: Rajini Sivaram >Priority: Major > > In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, > and the value is default to 2 minutes. We've observed that when it was set to > MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the > {{broker.flush}} call would be blocked during the time when its destination > brokers are undergoing some unavailability: > {code} > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method) > - parking to wait for <0x0006aeb21a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown > Source) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown > Source) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown > Source) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown > Source) > at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown > Source) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > And even after the broker went back to normal, producers would still be > blocked. One suspicion is that when broker's not able to handle the request > in time, the responses are dropped somehow inside the Sender, and hence > whoever waiting on this response would be blocked forever. > We've observed such scenarios when 1) broker's transiently failed for a > while, 2) network partitioned transiently, and 3) broker's bad config like > ACL caused it to not be able to handle requests for a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON
cmccabe commented on pull request #9197: URL: https://github.com/apache/kafka/pull/9197#issuecomment-675851293 There seems to be a Jenkins issue here: ``` 21:54:39 Fetching upstream changes from git://github.com/apache/kafka.git 21:54:39 > git fetch --tags --progress -- git://github.com/apache/kafka.git +refs/heads/*:refs/remotes/origin/* +refs/pull/*:refs/remotes/origin/pr/* 21:55:17 FATAL: java.io.IOException: Unexpected termination of the channel 21:55:17 java.io.EOFException 21:55:17 at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2681) 21:55:17 at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156) 21:55:17 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862) 21:55:17 at java.io.ObjectInputStream.(ObjectInputStream.java:358) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472693268 ## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ## @@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = { + def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): Unit = { +// when using --bootstrap-server, it should be illegal to alter anything that is not a quota and not a SCRAM credential +// for both user and client entities Review comment: Yeah, I think we should leave it alone for now, for the reasons you state. But thanks for the context. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON
cmccabe commented on pull request #9197: URL: https://github.com/apache/kafka/pull/9197#issuecomment-675850067 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472688926 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: I thought about this more and I think I see a good way out of this difficulty. We should just have an accessor method like `userInfo(String name)` that returns a `KafkaFuture`. We can dynamically create this future if needed. Then we can have a single RPC and a single API which is just `describe` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9708) Connector does not prefer to use packaged classes during configuration
[ https://issues.apache.org/jira/browse/KAFKA-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-9708. --- Resolution: Fixed > Connector does not prefer to use packaged classes during configuration > -- > > Key: KAFKA-9708 > URL: https://issues.apache.org/jira/browse/KAFKA-9708 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 2.6.0 > > > In connector tasks, classes loaded during configuration are preferentially > loaded from the PluginClassLoader since KAFKA-8819 was implemented. This same > prioritization is not currently respected in the connector itself, where the > delegating classloader is used as the context classloader. This leads to the > possibility for different versions of converters to be loaded, or different > versions of dependencies to be found when executing code in the connector vs > task. > Worker::startConnector should be changed to follow the startTask / KAFKA-8819 > prioritization scheme, by activating the PluginClassLoader earlier. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9708) Connector does not prefer to use packaged classes during configuration
[ https://issues.apache.org/jira/browse/KAFKA-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180249#comment-17180249 ] Konstantine Karantasis commented on KAFKA-9708: --- The PR was closed with a note that this was fixed by: [https://github.com/apache/kafka/pull/8069] and as part of https://issues.apache.org/jira/browse/KAFKA-9374 Closing this issue as fixed. > Connector does not prefer to use packaged classes during configuration > -- > > Key: KAFKA-9708 > URL: https://issues.apache.org/jira/browse/KAFKA-9708 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > > In connector tasks, classes loaded during configuration are preferentially > loaded from the PluginClassLoader since KAFKA-8819 was implemented. This same > prioritization is not currently respected in the connector itself, where the > delegating classloader is used as the context classloader. This leads to the > possibility for different versions of converters to be loaded, or different > versions of dependencies to be found when executing code in the connector vs > task. > Worker::startConnector should be changed to follow the startTask / KAFKA-8819 > prioritization scheme, by activating the PluginClassLoader earlier. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9708) Connector does not prefer to use packaged classes during configuration
[ https://issues.apache.org/jira/browse/KAFKA-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9708: -- Fix Version/s: 2.6.0 > Connector does not prefer to use packaged classes during configuration > -- > > Key: KAFKA-9708 > URL: https://issues.apache.org/jira/browse/KAFKA-9708 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 2.6.0 > > > In connector tasks, classes loaded during configuration are preferentially > loaded from the PluginClassLoader since KAFKA-8819 was implemented. This same > prioritization is not currently respected in the connector itself, where the > delegating classloader is used as the context classloader. This leads to the > possibility for different versions of converters to be loaded, or different > versions of dependencies to be found when executing code in the connector vs > task. > Worker::startConnector should be changed to follow the startTask / KAFKA-8819 > prioritization scheme, by activating the PluginClassLoader earlier. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit
rondagostino commented on a change in pull request #9200: URL: https://github.com/apache/kafka/pull/9200#discussion_r472636924 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -185,10 +192,9 @@ public void close() { deleteAllTopics(backup.kafka()); primary.stop(); backup.stop(); +assertFalse(exited.get()); Review comment: ```suggestion assertFalse(exited.get()); Exit.resetExitProcedure(); ``` ## File path: core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala ## @@ -29,14 +29,31 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.apache.kafka.common.utils.Exit +import org.junit.After import org.junit.Test import org.junit.Assert._ +import org.junit.Before class MirrorMakerIntegrationTest extends KafkaServerTestHarness { override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, new Properties())) + val exited = new AtomicBoolean(false) + + @Before + override def setUp(): Unit = { +Exit.setExitProcedure((_, _) => exited.set(true)) +super.setUp() + } + + @After + override def tearDown(): Unit = { +super.tearDown() +assertFalse(exited.get()) Review comment: ```suggestion assertFalse(exited.get()) Exit.resetExitProcedure() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-675828534 @abbccdda @omkreddy , could you review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-675828368 @mjsax , could you review this small PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)
LMnet commented on pull request #8955: URL: https://github.com/apache/kafka/pull/8955#issuecomment-675817076 Voting for the KIP successfully finished and this pull request could be merged now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472575326 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: I can see reasonable arguments for all the possibilities, so if you feel strongly about one of them then I would be fine with it. For example, `list()` and `describe()` even though they return the same thing (now -- `describe()` could return more later potentially). Just let me know. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: > a list RPC to show everything, and a describe RPC to show only some things. Do you mean a list RPC that takes no arguments and returns every credential defined for every user and a describe RPC that takes 1 or more users and returns every credential defined for those specified users, and they both return the same information for each credential? Or do you mean a list RPC and a describe RPC that return different sets of information (as is done with list vs. describe topics)? I think you mean the former (two RPCs, each returning the same thing), but I want to be certain I understand. > There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. I don't understand what this is referring to. By "we currently only make one RPC" to what are you referring? > I also feel like in AdminClient, errors should be handled with futures pretty much all the time Agreed. Will convert to using futures always, whenever we arrive at the final decision on what the RPCs need to look like. I'm wondering if we convert to returning futures everywhere, can we stick with the one describe RPC? For example, could the admin client return a `Future>>`? Would that work, and if so, would that be a reasonable way to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472574569 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: If we do decide to go with 2 separate APIs, then I might be concerned about using `list()` vs `describe()` if they return the same set of information (i.e. mechanism and iterations). Perhaps using two separate names gives us room to expand `describe()` to return more information later on, though. But if not, and they will always return the same information, then maybe `describeAll()` and `describe()` (or `listAll()` and `list()`) might be better? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9198: HOTFIX: use Exit.exit instead of System.exit
mjsax merged pull request #9198: URL: https://github.com/apache/kafka/pull/9198 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: > a list RPC to show everything, and a describe RPC to show only some things. Do you mean a list RPC that takes no arguments and returns every credential defined for every users and a describe RPC that takes 1 or more users and returns every credential defined for those specified users, and they both return the same information for each credential? Or do you mean a list RPC and a describe RPC that return different sets of information (as is done with list vs. describe topics)? I think you mean the former (two RPCs, each returning the same thing), but I want to be certain I understand. > There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. I don't understand what this is referring to. By "we currently only make one RPC" to what are you referring? > I also feel like in AdminClient, errors should be handled with futures pretty much all the time Agreed. Will convert to using futures always, whenever we arrive at the final decision on what the RPCs need to look like. I'm wondering if we convert to returning futures everywhere, can we stick with the one describe RPC? For example, could the admin client return a `Future>>`? Would that work, and if so, would that be a reasonable way to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: > a list RPC to show everything, and a describe RPC to show only some things. Do you mean a list RPC that takes no arguments and returns every credential defined for every users and a describe RPC that takes 1 or more users and returns every credential defined for those specified users, and they both return the same information for each credential? Or do you mean a list RPC and a describe RPC that return different sets of information (as is done with list vs. describe topics)? I think you mean the former (two RPCs, each returning the same thing), but I want to be certain I understand. > There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. I don't understand what this is referring to. By "we currently only make one RPC" to what are you referring? > I also feel like in AdminClient, errors should be handled with futures pretty much all the time Agreed. Will convert to using futures always, whenever we arrive at the final decision on what the RPCs need to look like. I'm wondering if we convert to returning futures everywhere, can we stick with the one describe RPC? For example, could the admin client return a `Future>>`? Would that work, and if so, would that be a reasonable way to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet opened a new pull request #9200: MINOR: mirror integration tests should not call System.exit
lbradstreet opened a new pull request #9200: URL: https://github.com/apache/kafka/pull/9200 406635bcc9f2a4c439d198ea0549170de331323c switched System.exit to using Exit.exit, however the integration tests did not choose to override the exit procedure. This can cause invalid test runs with errors like this: ``` [2020-08-18T15:52:01.142Z] * What went wrong: [2020-08-18T15:52:01.142Z] Execution failed for task ':connect:mirror:integrationTest'. [2020-08-18T15:52:01.142Z] > Process 'Gradle Test Executor 192' finished with non-zero exit value 1 [2020-08-18T15:52:01.142Z] This problem might be caused by incorrect test process configuration. [2020-08-18T15:52:01.142Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/6.6/userguide/java_testing.html#sec:test_execution ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore
vvcephei commented on a change in pull request #9137: URL: https://github.com/apache/kafka/pull/9137#discussion_r472531237 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java ## @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.state.internals; - -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.rocksdb.RocksIterator; - -import java.util.Set; - -class RocksDBPrefixIterator extends RocksDbIterator { Review comment: Was this class unused or something? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ## @@ -249,7 +248,23 @@ public void putAll(final List> entries) { validateStoreOpen(); final KeyValueIterator storeIterator = wrapped().range(from, to); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().range(cacheName, from, to); -return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); +return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false); +} + +@Override +public KeyValueIterator reverseRange(final Bytes from, +final Bytes to) { +if (from.compareTo(to) > 0) { +LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " ++ "This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + +"Note that the built-in numerical serdes do not follow this for negative numbers"); Review comment: This warning seems to miss the most likely scenario, that the user just passed the arguments in the wrong order. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9198: HOTFIX: use Exit.exit instead of System.exit
cmccabe commented on pull request #9198: URL: https://github.com/apache/kafka/pull/9198#issuecomment-675755685 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472530253 ## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ## @@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = { + def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): Unit = { +// when using --bootstrap-server, it should be illegal to alter anything that is not a quota and not a SCRAM credential +// for both user and client entities Review comment: @cmccabe Good question, actually. There is already a check to make sure a non-existent config cannot be **deleted** via `--zookeeper`: `shouldNotUpdateConfigIfNonExistingConfigIsDeletedUsingZookeper()`. This test passes, of course. However, there is no check to make sure an unrecognized config can be **added**, and in fact if I add that test it fails; the code will gladly go ahead and add anything we wish (and it will gladly go ahead and delete it if we wish as well -- the above test is only checking that something that doesn't exist can't be deleted). The next question, of course, is whether we should "fix" this or not. What do you think? To fix it we would need the full set of allowed configs at the User, Client, Topic, and Broker levels and then insert code to check accordingly. Since the ZooKeeper update path is going away due to KIP-500, I'm wondering if we can just leave it alone? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text
cmccabe commented on pull request #9199: URL: https://github.com/apache/kafka/pull/9199#issuecomment-675734800 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472507947 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Objects; + +/** + * The result of the {@link Admin#describeUserScramCredentials()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeUserScramCredentialsResult { +private final KafkaFuture> future; Review comment: Thinking about this more, I would prefer having both a list RPC to show everything, and a describe RPC to show only some things. There are a few reasons why. One is that even though we currently only make one RPC, in the future we might make more than one. In that case we would want multiple futures. Another is that the general pattern in Kafka is that list RPCs show everything, but describe RPCs show only some things. It's true that there are some places where we violate this pattern, but I still think it's worth trying to follow where we can. Maybe this should be documented better so that when we add new RPCs, people aren't confused about whether to use "list" or "describe." I also feel like in AdminClient, errors should be handled with futures pretty much all the time, unless there is a really strong reason not to. This allows people to use an async style of programming. In contrast, mixing in some errors that aren't futures, but need to be checked explicitly is very likely to confuse people. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9067: MINOR: Streams integration tests should not call exit
mjsax commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-675733090 Seems I made a mistake cherry-picking to `2.6` -- I remember that there was a conflict; guess I resolved it incorrectly. Did a HOTFIX PR: #9198 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text
rondagostino commented on pull request #9199: URL: https://github.com/apache/kafka/pull/9199#issuecomment-675731352 @cmccabe Can you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text
rondagostino opened a new pull request #9199: URL: https://github.com/apache/kafka/pull/9199 Changing a topic config with the `kafka-topics` command while connecting to Kafka via `--bootstrap-server` (rather than connecting to ZooKeeper via `--zookeeper`) is not supported. The desired functionality is available elsewhere, though: it is possible to change a topic config while connecting to Kafka rather than ZooKeeper via the `kafka-configs` command instead. However, neither the `kafka-topics` error message received nor the `kafka-topics` help information itself indicates this other possibility. For example: ``` $ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --config flush.messages=12345 Option combination "[bootstrap-server],[config]" can't be used with option "[alter]" ``` ``` $ kafka-topics.sh ... --config A topic configuration override for the topic being created or altered...It is supported only in combination with – create if --bootstrap-server option is used. ``` Rather than simply saying that what you want to do isn't available, it would be better to say also that you can do it with the `kafka-configs` command. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #9198: HOTFIX: use Exit.exit instead of System.exit
mjsax opened a new pull request #9198: URL: https://github.com/apache/kafka/pull/9198 Follow up to #9067 -- seems I made an error resolving the unclean cherry-pick from `trunk` to `2.6`. Call for review @vvcephei @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472501147 ## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ## @@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = { + def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): Unit = { +// when using --bootstrap-server, it should be illegal to alter anything that is not a quota and not a SCRAM credential +// for both user and client entities Review comment: sorry, this might be a silly question, but how are these constraints different when using --zookeeper? should we test that as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472498809 ## File path: clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java ## @@ -26,11 +26,18 @@ public void testDelayedAllocationSchemaDetection() throws Exception { //verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected. for (ApiKeys key : ApiKeys.values()) { -if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE -|| key == ApiKeys.EXPIRE_DELEGATION_TOKEN || key == ApiKeys.RENEW_DELEGATION_TOKEN) { -assertTrue(key + " should require delayed allocation", key.requiresDelayedAllocation); -} else { -assertFalse(key + " should not require delayed allocation", key.requiresDelayedAllocation); +switch (key) { +case PRODUCE: +case JOIN_GROUP: +case SYNC_GROUP: +case SASL_AUTHENTICATE: +case EXPIRE_DELEGATION_TOKEN: +case RENEW_DELEGATION_TOKEN: +case ALTER_USER_SCRAM_CREDENTIALS: +assertTrue(key + " should require delayed allocation", key.requiresDelayedAllocation); +break; +default: +assertFalse(key + " should not require delayed allocation", key.requiresDelayedAllocation); Review comment: I know it's not strictly necessary, but it would be nice to have a "break" after the default clause too This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472496932 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.errors.ApiException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Representation of all SASL/SCRAM credentials associated with a user that can be retrieved, or an exception indicating + * why credentials could not be retrieved. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API;>KIP-554: Add Broker-side SCRAM Config API + */ +public class UserScramCredentialsDescription { +private final String name; +private final Optional exception; +private final List credentialInfos; + +/** + * Constructor for when SASL/SCRAM credentials associated with a user could not be retrieved + * + * @param name the required user name + * @param exception the required exception indicating why the credentials for the user could not be retrieved + */ +public UserScramCredentialsDescription(String name, ApiException exception) { +this(name, Optional.of(Objects.requireNonNull(exception)), Collections.emptyList()); +} + +/** + * Constructor for when SASL/SCRAM credentials associated with a user are successfully retrieved + * + * @param name the required user name + * @param credentialInfos the required SASL/SCRAM credential representations for the user + */ +public UserScramCredentialsDescription(String name, List credentialInfos) { +this(name, Optional.empty(), Objects.requireNonNull(credentialInfos)); +} + +private UserScramCredentialsDescription(String name, Optional exception, List credentialInfos) { +this.name = Objects.requireNonNull(name); +this.exception = Objects.requireNonNull(exception); +this.credentialInfos = Collections.unmodifiableList(new ArrayList<>(credentialInfos)); +} + +/** + * + * @return the user name + */ +public String name() { +return name; +} + +/** + * + * @return the exception, if any, that prevented the user's SASL/SCRAM credentials from being retrieved + */ +public Optional exception() { +return exception; +} + +/** + * + * @return the always non-null/unmodifiable list of SASL/SCRAM credential representations for the user + * (empty if {@link #exception} defines an exception) + */ +public List credentialInfos() { Review comment: It would be good to throw the exception here if there is one, so that it wasn't possible to ignore the problem This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r472494891 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describe all SASL/SCRAM credentials. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ +default DescribeUserScramCredentialsResult describeUserScramCredentials() { +return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); +} + +/** + * Describe SASL/SCRAM credentials for the given users. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: @rondagostino : It still says "A user explicitly specified here that does not have a SCRAM credential will not appear in the results". I thought we agreed that such a user would get an error code? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8050: MINOR: Include call name in TimeoutException
mimaison commented on pull request #8050: URL: https://github.com/apache/kafka/pull/8050#issuecomment-675714697 @omkreddy @hachikuji Can you take a look? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9194: KAFKA-10384: Separate converters from generated messages
cmccabe commented on pull request #9194: URL: https://github.com/apache/kafka/pull/9194#issuecomment-675714307 JDK8 test failure looks like a Jenkins issue. ``` 13:29:01 Fetching upstream changes from git://github.com/apache/kafka.git 13:29:01 > git fetch --tags --progress -- git://github.com/apache/kafka.git +refs/heads/*:refs/remotes/origin/* +refs/pull/*:refs/remotes/origin/pr/* 13:29:17 FATAL: java.io.IOException: Unexpected termination of the channel 13:29:17 java.io.EOFException 13:29:17 at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2681) 13:29:17 at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file
mimaison commented on pull request #9131: URL: https://github.com/apache/kafka/pull/9131#issuecomment-675713485 @mjsax Can you take another look? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON
cmccabe commented on pull request #9197: URL: https://github.com/apache/kafka/pull/9197#issuecomment-675711065 Looks like there is a checkstyle issue upstream: ``` 12:59:42 > Task :streams:upgrade-system-tests-22:checkstyleTest FAILED 12:59:42 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13@2/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java:43: Line matches the illegal pattern ''System.exit': Should not directly call System.exit, but Exit.exit instead.'. [dontUseSystemExit] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe merged pull request #9144: URL: https://github.com/apache/kafka/pull/9144 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r472477998 ## File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ## @@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder request * cancelling the request. The request may get cancelled sooner if the socket disconnects * for any reason including if another pending request to the same node timed out first. * @param callback the callback to invoke when we get a response + * @param initialPrincipalName the initial client principal name, when building a forward request + * @param initialClientId the initial client id, when building a forward request */ ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder requestBuilder, long createdTimeMs, boolean expectResponse, int requestTimeoutMs, + String initialPrincipalName, Review comment: Fair enough This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on pull request #9144: URL: https://github.com/apache/kafka/pull/9144#issuecomment-675703162 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-675697535 @abbccdda Thanks for the reviews, can you take another look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10418) Unclear error/docs when altering topic configs via kafka-topics with --bootstrap-server
[ https://issues.apache.org/jira/browse/KAFKA-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-10418: -- Summary: Unclear error/docs when altering topic configs via kafka-topics with --bootstrap-server (was: Unclear deprecation of altering topic configs via kafka-topics with --bootstrap-server) > Unclear error/docs when altering topic configs via kafka-topics with > --bootstrap-server > --- > > Key: KAFKA-10418 > URL: https://issues.apache.org/jira/browse/KAFKA-10418 > Project: Kafka > Issue Type: Improvement >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > Fix For: 2.7.0 > > > Changing a topic config with the kafka-topics command while connecting to > Kafka via --bootstrap-server (rather than connecting to ZooKeeper via > --zookeeper) has been deprecated. The desired functionality is available > elsewhere: it is possible to change a topic config while connecting to Kafka > rather than ZooKeeper via the kafka-configs command instead. However, > neither the kafka-topics error message received nor the kafka-topics help > information itself indicates this other possibility. For example: > {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test > --config flush.messages=12345 > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]" > }} > {{$ kafka-topics.sh > ... > --config A topic configuration override for > the topic being created or altered...It is supported only in combination with > -- create if --bootstrap-server option is used. > }} > Rather than simply saying that what you want to do isn't available, it would > be better to say also that you can do it with the kafka-configs command. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10418) Unclear error/docs when altering topic configs via kafka-topics with --bootstrap-server
[ https://issues.apache.org/jira/browse/KAFKA-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-10418: -- Description: Changing a topic config with the kafka-topics command while connecting to Kafka via --bootstrap-server (rather than connecting to ZooKeeper via --zookeeper) is not supported. The desired functionality is available elsewhere, though: it is possible to change a topic config while connecting to Kafka rather than ZooKeeper via the kafka-configs command instead. However, neither the kafka-topics error message received nor the kafka-topics help information itself indicates this other possibility. For example: {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --config flush.messages=12345 Option combination "[bootstrap-server],[config]" can't be used with option "[alter]" }} {{$ kafka-topics.sh ... --config A topic configuration override for the topic being created or altered...It is supported only in combination with -- create if --bootstrap-server option is used. }} Rather than simply saying that what you want to do isn't available, it would be better to say also that you can do it with the kafka-configs command. was: Changing a topic config with the kafka-topics command while connecting to Kafka via --bootstrap-server (rather than connecting to ZooKeeper via --zookeeper) has been deprecated. The desired functionality is available elsewhere: it is possible to change a topic config while connecting to Kafka rather than ZooKeeper via the kafka-configs command instead. However, neither the kafka-topics error message received nor the kafka-topics help information itself indicates this other possibility. For example: {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --config flush.messages=12345 Option combination "[bootstrap-server],[config]" can't be used with option "[alter]" }} {{$ kafka-topics.sh ... --config A topic configuration override for the topic being created or altered...It is supported only in combination with -- create if --bootstrap-server option is used. }} Rather than simply saying that what you want to do isn't available, it would be better to say also that you can do it with the kafka-configs command. > Unclear error/docs when altering topic configs via kafka-topics with > --bootstrap-server > --- > > Key: KAFKA-10418 > URL: https://issues.apache.org/jira/browse/KAFKA-10418 > Project: Kafka > Issue Type: Improvement >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > Fix For: 2.7.0 > > > Changing a topic config with the kafka-topics command while connecting to > Kafka via --bootstrap-server (rather than connecting to ZooKeeper via > --zookeeper) is not supported. The desired functionality is available > elsewhere, though: it is possible to change a topic config while connecting > to Kafka rather than ZooKeeper via the kafka-configs command instead. > However, neither the kafka-topics error message received nor the kafka-topics > help information itself indicates this other possibility. For example: > {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test > --config flush.messages=12345 > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]" > }} > {{$ kafka-topics.sh > ... > --config A topic configuration override for > the topic being created or altered...It is supported only in combination with > -- create if --bootstrap-server option is used. > }} > Rather than simply saying that what you want to do isn't available, it would > be better to say also that you can do it with the kafka-configs command. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10418) Incomplete error/docs when altering topic configs via kafka-topics with --bootstrap-server
[ https://issues.apache.org/jira/browse/KAFKA-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-10418: -- Summary: Incomplete error/docs when altering topic configs via kafka-topics with --bootstrap-server (was: Unclear error/docs when altering topic configs via kafka-topics with --bootstrap-server) > Incomplete error/docs when altering topic configs via kafka-topics with > --bootstrap-server > -- > > Key: KAFKA-10418 > URL: https://issues.apache.org/jira/browse/KAFKA-10418 > Project: Kafka > Issue Type: Improvement >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Minor > Fix For: 2.7.0 > > > Changing a topic config with the kafka-topics command while connecting to > Kafka via --bootstrap-server (rather than connecting to ZooKeeper via > --zookeeper) is not supported. The desired functionality is available > elsewhere, though: it is possible to change a topic config while connecting > to Kafka rather than ZooKeeper via the kafka-configs command instead. > However, neither the kafka-topics error message received nor the kafka-topics > help information itself indicates this other possibility. For example: > {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test > --config flush.messages=12345 > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]" > }} > {{$ kafka-topics.sh > ... > --config A topic configuration override for > the topic being created or altered...It is supported only in combination with > -- create if --bootstrap-server option is used. > }} > Rather than simply saying that what you want to do isn't available, it would > be better to say also that you can do it with the kafka-configs command. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10419) KAFKA BROKER Shuts down when a topic is deleted manually from command line on Windows 1) operating System.
Ajay Kapoor created KAFKA-10419: --- Summary: KAFKA BROKER Shuts down when a topic is deleted manually from command line on Windows 1) operating System. Key: KAFKA-10419 URL: https://issues.apache.org/jira/browse/KAFKA-10419 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.6.0 Environment: WIndows 10 Operating System Reporter: Ajay Kapoor KAFKA VERSION: KAFKA_2.13-2.6.0 Delete of topic on Windows causes kafka broker shutdown: [2020-08-18 15:18:22,858] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(quickstart-events-0) (kafka.server.ReplicaAlterLogDirsManager) [2020-08-18 15:18:22,899] ERROR Error while renaming dir for quickstart-events-0 in log dir C:\tmp\kafka-logs (kafka.server.LogDirFailureChannel) java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\quickstart-events-0 -> C:\tmp\kafka-logs\quickstart-events-0.d767af7933ae4fe087c212994ef02e90-delete at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395) at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292) at java.base/java.nio.file.Files.move(Files.java:1425) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:913) at kafka.log.Log.$anonfun$renameDir$2(Log.scala:981) at kafka.log.Log.renameDir(Log.scala:2340) at kafka.log.LogManager.asyncDelete(LogManager.scala:935) at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:470) at kafka.cluster.Partition.delete(Partition.scala:461) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:344) at kafka.server.ReplicaManager.$anonfun$stopReplicas$9(ReplicaManager.scala:448) at scala.collection.mutable.HashMap$Node.foreach(HashMap.scala:587) at scala.collection.mutable.HashMap.foreach(HashMap.scala:475) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:445) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:252) at kafka.server.KafkaApis.handle(KafkaApis.scala:137) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at java.base/java.lang.Thread.run(Thread.java:830) Suppressed: java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\quickstart-events-0 -> C:\tmp\kafka-logs\quickstart-events-0.d767af7933ae4fe087c212994ef02e90-delete at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309) at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292) at java.base/java.nio.file.Files.move(Files.java:1425) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:910) ... 14 more [2020-08-18 15:18:22,914] WARN [ReplicaManager broker=0] Stopping serving replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager) [2020-08-18 15:18:22,930] WARN [ReplicaManager broker=0] Broker 0 stopped fetcher for partitions and stopped moving logs for partitions because they are in the failed log directory C:\tmp\kafka-logs. (kafka.server.ReplicaManager) [2020-08-18 15:18:22,932] WARN Stopping serving logs in dir C:\tmp\kafka-logs (kafka.log.LogManager) [2020-08-18 15:18:22,946] ERROR Shutdown broker because all log dirs in C:\tmp\kafka-logs have failed (kafka.log.LogManager) How to reproduce:: 1. Start Zookeeper on windows >bin\windows\zookeeper-server-start.bat config\zookeeper.properties 2. Start Kafka Broker on Windows >bin\windows\kafka-server-start.bat config\server.properties 3. Create a Topic on Kafka Broker >bin\windows\kafka-topics.bat --create --topic quickstart-events >--bootstrap-server localhost:9092 4. Delete the Kafka Topic created above. >bin\windows\kafka-topics.bat --delete --topic quickstart-events >--bootstrap-server localhost:9092 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10418) Unclear deprecation of altering topic configs via kafka-topics with --bootstrap-server
Ron Dagostino created KAFKA-10418: - Summary: Unclear deprecation of altering topic configs via kafka-topics with --bootstrap-server Key: KAFKA-10418 URL: https://issues.apache.org/jira/browse/KAFKA-10418 Project: Kafka Issue Type: Improvement Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 2.7.0 Changing a topic config with the kafka-topics command while connecting to Kafka via --bootstrap-server (rather than connecting to ZooKeeper via --zookeeper) has been deprecated. The desired functionality is available elsewhere: it is possible to change a topic config while connecting to Kafka rather than ZooKeeper via the kafka-configs command instead. However, neither the kafka-topics error message received nor the kafka-topics help information itself indicates this other possibility. For example: {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --config flush.messages=12345 Option combination "[bootstrap-server],[config]" can't be used with option "[alter]" }} {{$ kafka-topics.sh ... --config A topic configuration override for the topic being created or altered...It is supported only in combination with -- create if --bootstrap-server option is used. }} Rather than simply saying that what you want to do isn't available, it would be better to say also that you can do it with the kafka-configs command. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r472441802 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java ## @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class SlidingWindowedKStreamImplTest { + +private static final String TOPIC = "input"; +private final StreamsBuilder builder = new StreamsBuilder(); +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private TimeWindowedKStream windowedStream; + +@Before +public void before() { +final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); +windowedStream = stream. +groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L))); +} + +@Test +public void shouldCountSlidingWindows() { +final MockProcessorSupplier, Long> supplier = new MockProcessorSupplier<>(); +windowedStream +.count() +.toStream() +.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +processData(driver); +} +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(0L, 100L))), +equalTo(ValueAndTimestamp.make(1L, 100L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(101L, 201L))), +equalTo(ValueAndTimestamp.make(1L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(50L, 150L))), +equalTo(ValueAndTimestamp.make(2L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(400L, 500L))), +equalTo(ValueAndTimestamp.make(1L, 500L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("2", new TimeWindow(100L, 200L))), +equalTo(ValueAndTimestamp.make(2L, 200L))); +assertThat( +
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r472440045 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java ## @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockReducer; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class SlidingWindowedKStreamImplTest { + +private static final String TOPIC = "input"; +private final StreamsBuilder builder = new StreamsBuilder(); +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private TimeWindowedKStream windowedStream; + +@Before +public void before() { +final KStream stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); +windowedStream = stream. +groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), ofMillis(1000L))); +} + +@Test +public void shouldCountSlidingWindows() { +final MockProcessorSupplier, Long> supplier = new MockProcessorSupplier<>(); +windowedStream +.count() +.toStream() +.process(supplier); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { +processData(driver); +} +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(0L, 100L))), +equalTo(ValueAndTimestamp.make(1L, 100L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(101L, 201L))), +equalTo(ValueAndTimestamp.make(1L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(50L, 150L))), +equalTo(ValueAndTimestamp.make(2L, 150L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("1", new TimeWindow(400L, 500L))), +equalTo(ValueAndTimestamp.make(1L, 500L))); +assertThat( +supplier.theCapturedProcessor().lastValueAndTimestampPerKey +.get(new Windowed<>("2", new TimeWindow(100L, 200L))), +equalTo(ValueAndTimestamp.make(2L, 200L))); +assertThat( +
[GitHub] [kafka] cmccabe opened a new pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON
cmccabe opened a new pull request #9197: URL: https://github.com/apache/kafka/pull/9197 This reverts commit bf6dffe93bbe0fe33ad076ebccebb840d66b936d This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10415) Provide an officially supported Node.js client
[ https://issues.apache.org/jira/browse/KAFKA-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180036#comment-17180036 ] Matthew T. Adams commented on KAFKA-10415: -- [~ijuma] I understand. Should I have filed an issue with Confluent instead, then? If so, can you point me to an issue tracker for Confluent where I could file this? > Provide an officially supported Node.js client > -- > > Key: KAFKA-10415 > URL: https://issues.apache.org/jira/browse/KAFKA-10415 > Project: Kafka > Issue Type: New Feature > Components: clients >Reporter: Matthew T. Adams >Priority: Major > > Please provide an official Node.js client for Kafka at feature parity with > all of the other officially supported & provided Kafka clients. > It is extremely confusing when it comes to trying to use Kafka in the Node.js > ecosystem. There are many clients, some look legitimate > ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out > of date (many listed at > [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), > and others have confusing relationships among them > ([https://github.com/nodefluent/node-sinek] & > [https://github.com/nodefluent/kafka-streams]). Most of them are publicly > asking for help. This leaves teams having to waste time trying to figure out > which client has the Kafka features they need (mostly talking about streaming > here), and which client has high quality and will be around in the future. > If the client came directly from this project, those decisions would be made > and we could get on about our work. > JavaScript is on the of the most popular languages on the planet, and the > Node.js user base is huge – big enough that a Node.js client provided > directly by the Kafka team is justified. The list at > [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] > doesn't even mention what is perhaps the most confidence-inducing Node.js > client thanks to its documentation, > [https://kafka.js.org.|https://kafka.js.org./] The list at > [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an > officially-supported Go language client; Go's community is dwarfed by that of > Node.js. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
abbccdda commented on pull request #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-675675970 Some compilation errors: ``` 11:41:23 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk15-scala2.13/clients/src/main/java/org/apache/kafka/clients/Metadata.java:474: error: cannot assign a value to final variable refreshBackoffMs 11:41:23 this.refreshBackoffMs = this.refreshBackoff.backoff(this.attempts); 11:41:23 ^ 11:41:23 /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk15-scala2.13/clients/src/main/java/org/apache/kafka/clients/Metadata.java:481: error: cannot assign a value to final variable refreshBackoffMs 11:41:23 this.refreshBackoffMs = this.refreshBackoff.baseBackoff(); 11:41:23 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r472436400 ## File path: checkstyle/suppressions.xml ## @@ -167,6 +167,9 @@ + Review comment: We don't! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
guozhangwang commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472429141 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java ## @@ -33,7 +35,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor; -public class RocksDBMetrics { +public class RocksDBMetrics { Review comment: nit: extra space. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } -public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { +public void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { +final MetricName metricName = metrics.metricName( +name, +STATE_STORE_LEVEL_GROUP, +description, +storeLevelTagMap(threadId, taskId, metricsScope, storeName) +); +if (metrics.metric(metricName) == null) { +final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); +final String key = storeSensorPrefix(threadId, taskId, storeName); +synchronized (storeLevelMetrics) { +metrics.addMetric(metricName, metricConfig, valueProvider); +storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); +} +} +} + +public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { +removeAllStoreLevelMetrics(threadId, taskId, storeName); +removeAllStoreLevelSensors(threadId, taskId, storeName); +} Review comment: This may be a bit paranoid, but when adding them, the order seem to be `initSensors` first and `initGauges`, while removing we call `removeAllStoreLevelMetrics` first and then the other. I know that today there should be not concurrent threads trying to init / removeAll concurrently, but just to be safe maybe we can make the call ordering to be sensors first and then gauges? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10327) Make flush after some count of putted records in SinkTask
[ https://issues.apache.org/jira/browse/KAFKA-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10327: --- Labels: kip-required needs-kip (was: ) > Make flush after some count of putted records in SinkTask > - > > Key: KAFKA-10327 > URL: https://issues.apache.org/jira/browse/KAFKA-10327 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Pavel Kuznetsov >Priority: Major > Labels: kip-required, needs-kip > > In current version of kafka connect all records accumulated with SinkTask.put > method are flushed to target system on a time-based manner. So data is > flushed and offsets are committed every offset.flush.timeout.ms (default is > 6) ms. > But you can't control the number of messages you receive from Kafka between > two flushes. It may cause out of memory errors, because in-memory buffer may > grow a lot. > I suggest to add out of box support of count-based flush to kafka connect. It > requires new configuration parameter (offset.flush.count, for example). > Number of records sent to SinkTask.put should be counted, and if these amount > is greater than offset.flush.count's value, SinkTask.flush is called and > offsets are committed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
guozhangwang commented on pull request #9177: URL: https://github.com/apache/kafka/pull/9177#issuecomment-675651901 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
abbccdda commented on pull request #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-675647225 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10415) Provide an officially supported Node.js client
[ https://issues.apache.org/jira/browse/KAFKA-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179995#comment-17179995 ] Ismael Juma commented on KAFKA-10415: - Thanks for the JIRA. One clarification, Apache Kafka doesn't have an officially supported Go language client. The link you referenced is from Confluent, not Apache Kafka > Provide an officially supported Node.js client > -- > > Key: KAFKA-10415 > URL: https://issues.apache.org/jira/browse/KAFKA-10415 > Project: Kafka > Issue Type: New Feature > Components: clients >Reporter: Matthew T. Adams >Priority: Major > > Please provide an official Node.js client for Kafka at feature parity with > all of the other officially supported & provided Kafka clients. > It is extremely confusing when it comes to trying to use Kafka in the Node.js > ecosystem. There are many clients, some look legitimate > ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out > of date (many listed at > [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), > and others have confusing relationships among them > ([https://github.com/nodefluent/node-sinek] & > [https://github.com/nodefluent/kafka-streams]). Most of them are publicly > asking for help. This leaves teams having to waste time trying to figure out > which client has the Kafka features they need (mostly talking about streaming > here), and which client has high quality and will be around in the future. > If the client came directly from this project, those decisions would be made > and we could get on about our work. > JavaScript is on the of the most popular languages on the planet, and the > Node.js user base is huge – big enough that a Node.js client provided > directly by the Kafka team is justified. The list at > [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] > doesn't even mention what is perhaps the most confidence-inducing Node.js > client thanks to its documentation, > [https://kafka.js.org.|https://kafka.js.org./] The list at > [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an > officially-supported Go language client; Go's community is dwarfed by that of > Node.js. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10415) Provide an officially supported Node.js client
[ https://issues.apache.org/jira/browse/KAFKA-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179995#comment-17179995 ] Ismael Juma edited comment on KAFKA-10415 at 8/18/20, 6:14 PM: --- Thanks for the JIRA. One clarification, Apache Kafka doesn't have an officially supported Go language client. The link you referenced is from Confluent, not Apache Kafka. Apache Kafka ships a Java client only. was (Author: ijuma): Thanks for the JIRA. One clarification, Apache Kafka doesn't have an officially supported Go language client. The link you referenced is from Confluent, not Apache Kafka > Provide an officially supported Node.js client > -- > > Key: KAFKA-10415 > URL: https://issues.apache.org/jira/browse/KAFKA-10415 > Project: Kafka > Issue Type: New Feature > Components: clients >Reporter: Matthew T. Adams >Priority: Major > > Please provide an official Node.js client for Kafka at feature parity with > all of the other officially supported & provided Kafka clients. > It is extremely confusing when it comes to trying to use Kafka in the Node.js > ecosystem. There are many clients, some look legitimate > ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out > of date (many listed at > [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), > and others have confusing relationships among them > ([https://github.com/nodefluent/node-sinek] & > [https://github.com/nodefluent/kafka-streams]). Most of them are publicly > asking for help. This leaves teams having to waste time trying to figure out > which client has the Kafka features they need (mostly talking about streaming > here), and which client has high quality and will be around in the future. > If the client came directly from this project, those decisions would be made > and we could get on about our work. > JavaScript is on the of the most popular languages on the planet, and the > Node.js user base is huge – big enough that a Node.js client provided > directly by the Kafka team is justified. The list at > [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] > doesn't even mention what is perhaps the most confidence-inducing Node.js > client thanks to its documentation, > [https://kafka.js.org.|https://kafka.js.org./] The list at > [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an > officially-supported Go language client; Go's community is dwarfed by that of > Node.js. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC
guozhangwang commented on a change in pull request #9191: URL: https://github.com/apache/kafka/pull/9191#discussion_r472368768 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class MissingSourceTopicException extends StreamsException { Review comment: I agree with @cadonna here, that all exceptions from inside Streams should be inheriting from `StreamsException`, and that may include 1) environmental issues like timeout --- note that after @mjsax KIP we would not throw TimeoutException (which is a KafkaException not StreamsException) but a new exception inherited from StreamsException, and IOException which are also wrapped as StateStoreException inherited from StreamsException, and 2) user bug in `process` etc which get caught by streams code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()
vvcephei commented on a change in pull request #9148: URL: https://github.com/apache/kafka/pull/9148#discussion_r472368613 ## File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class MockApiProcessorSupplier implements ProcessorSupplier { Review comment: This one isn't so straightforward. Although the supplied processors can be adapted, `theCapturedProcessor()` and `capturedProcessors(expectedNumberOfProcessors)` return `MockProcessor` specifically, so we'd need a whole new adapter to convert a MockApiProcessor into a MockProcessor. I'd rather leave it alone for now. These delegating classes will go away in a couple more PRs anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 closed pull request #7492: Kafka 7499 production exception handler serialization exceptions
wcarlson5 closed pull request #7492: URL: https://github.com/apache/kafka/pull/7492 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()
vvcephei commented on a change in pull request #9148: URL: https://github.com/apache/kafka/pull/9148#discussion_r472348482 ## File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java ## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class MockApiProcessor implements Processor { Review comment: Ah, yeah, this is a good idea. I'll have to migrate all the field references to method references so they can be delegated, but I wanted to do that anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing
[ https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-10395. - Fix Version/s: 2.7.0 Resolution: Fixed > TopologyTestDriver does not work with dynamic topic routing > --- > > Key: KAFKA-10395 > URL: https://issues.apache.org/jira/browse/KAFKA-10395 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: test-framework > Fix For: 2.7.0 > > > The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which > checks > > {code:java} > final Queue> outputRecords = > outputRecordsByTopic.get(topicName); > if (outputRecords == null) { > if (!processorTopology.sinkTopics().contains(topicName)) { > throw new IllegalArgumentException("Unknown topic: " + topicName); > } > } > {code} > The outputRecordsByTopic map keeps track of all topics that are actually > produced to, but obviously doesn't capture any topics that haven't yet > received output. The `processorTopology#sinkTopics` is supposed to account > for that by checking to make sure the topic is actually registered in the > topology, and throw an exception if not in case the user supplied the wrong > topic name to read from. > Unfortunately the TopicNameExtractor allows for dynamic routing of records to > any topic, so the topology isn't aware of all the possible output topics. If > trying to read from one of these topics that happens to not have received any > output yet, the test will throw the above misleading IllegalArgumentException. > We could just relax this check, but warning users who may actually have > accidentally passed in the wrong topic to read from seems quite useful. A > better solution would be to require registering all possible output topics to > the TTD up front. This would obviously require a KIP, but it would be a very > small one and shouldn't be too much trouble > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()
vvcephei commented on a change in pull request #9148: URL: https://github.com/apache/kafka/pull/9148#discussion_r472327085 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -667,7 +674,7 @@ public void validateCopartition() { private void validateGlobalStoreArguments(final String sourceName, final String topic, final String processorName, - final ProcessorSupplier stateUpdateSupplier, + final ProcessorSupplier stateUpdateSupplier, Review comment: Sure; all we do it verify it's not null, but it doesn't hurt. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8606) Provide a method to fetch committed offsets for a collection of TopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-8606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179703#comment-17179703 ] Mark Roberts commented on KAFKA-8606: - I'm reasonably sure that this ticket can be closed as of 2.4 > Provide a method to fetch committed offsets for a collection of TopicPartition > -- > > Key: KAFKA-8606 > URL: https://issues.apache.org/jira/browse/KAFKA-8606 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 2.3.0, 2.2.1 >Reporter: ov7a >Priority: Major > > Currently KafkaConsumer has methods for fetching begging offsets, end offsets > and offsets for times, all of them accepting a collection of TopicPartition. > There is a method to fetch committed offset for single TopicPartition, but > there is no public API to fetch commited offsets for a collection of > TopicPartition. So, If one wants to fetch all committed offsets for topic, a > request per partition is created. > Note that ConsumerCoordinator.fetchCommittedOffsets which called internally > in "committed" method does accept a collection of TopicPartition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9142: MINOR: Fix delete_topic for system tests
rondagostino commented on a change in pull request #9142: URL: https://github.com/apache/kafka/pull/9142#discussion_r472279480 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -503,7 +503,7 @@ def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True): self.logger.info("Running topic creation command...\n%s" % cmd) node.account.ssh(cmd) -def delete_topic(self, topic, node=None): +def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False): Review comment: @rajinisivaram Yes, merging to just trunk seems fine to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
cadonna commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472276615 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } -public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { +public void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { +final MetricName metricName = metrics.metricName( +name, +STATE_STORE_LEVEL_GROUP, +description, +storeLevelTagMap(threadId, taskId, metricsScope, storeName) +); +if (metrics.metric(metricName) == null) { +final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); +final String key = storeSensorPrefix(threadId, taskId, storeName); +synchronized (storeLevelMetrics) { +metrics.addMetric(metricName, metricConfig, valueProvider); +storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); +} +} +} + +public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { +removeAllStoreLevelMetrics(threadId, taskId, storeName); +removeAllStoreLevelSensors(threadId, taskId, storeName); +} Review comment: Yes, we need to synchronize. At least, we have to ensure that lines 411 and 438 are thread-safe. Then, if we do not want to have duplicates in `storeLevelSensors` we should ensure to have a lock between line 409 to 411. Between line 434 and 439, we need to ensure that the removal of all store level metrics completed otherwise it could happen that we find a store level metric that would prevent the addition of a metric but then the earlier found metric would be removed during the remainder of the removal process. Similar is true for the store level sensors. It is true that we always remove all of both collections together, but we do not add metric names and sensor names to both collections together. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
cadonna commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472276615 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } -public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { +public void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { +final MetricName metricName = metrics.metricName( +name, +STATE_STORE_LEVEL_GROUP, +description, +storeLevelTagMap(threadId, taskId, metricsScope, storeName) +); +if (metrics.metric(metricName) == null) { +final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); +final String key = storeSensorPrefix(threadId, taskId, storeName); +synchronized (storeLevelMetrics) { +metrics.addMetric(metricName, metricConfig, valueProvider); +storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); +} +} +} + +public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { +removeAllStoreLevelMetrics(threadId, taskId, storeName); +removeAllStoreLevelSensors(threadId, taskId, storeName); +} Review comment: Yes, we need to synchronize. At least, we have to ensure that lines 411 and 438 are thread-safe. Then, if we do not want to have duplicates in `storeLevelSensors` we should ensure to have a lock between line 409 to 411. Between line 434 and 439, we need to ensure that the removal of all store level metrics completed otherwise it could happen that we find a store level metric that would prevent the addition of a metric but then earlier found metric would be removed during the remainder of the removal process. Similar is true for the store level sensors. It is true that we always remove all of both collections together, but we do not add metric names and sensor names to both collections together. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wardha Perinkada Kattu updated KAFKA-10417: --- Description: Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` throws `ClassCastException` Works fine without the `suppress()` Code block tested - {code:java} val stream1 = requestStreams.merge(successStreams).merge(errorStreams) .groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde())) val streams2 = confirmationStreams .groupByKey(Grouped.with(Serdes.String(), serdesConfig.confirmationsSerde())) val cogrouped = stream1.cogroup(notificationAggregator).cogroup(streams2, confirmationsAggregator) .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( .aggregate({ null }, Materialized.`as`>("time-windowed-aggregated-stream-store") .withValueSerde(serdesConfig.notificationMetricSerde())) .suppress(Suppressed.untilWindowCloses(unbounded())) .toStream() {code} Exception thrown is: {code:java} Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier (org.apache.kafka.streams.kstream.internals.PassThrough and org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in unnamed module of loader 'app') {code} [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] was: Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` throws `ClassCastException` Works fine without the `suppress()` {code:java} val stream1 = requestStreams.merge(successStreams).merge(errorStreams) .groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde())) val streams2 = confirmationStreams .groupByKey(Grouped.with(Serdes.String(), serdesConfig.confirmationsSerde())) val cogrouped = stream1.cogroup(notificationAggregator).cogroup(streams2, confirmationsAggregator) .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( .aggregate({ null }, Materialized.`as`>("time-windowed-aggregated-stream-store") .withValueSerde(serdesConfig.notificationMetricSerde())) .suppress(Suppressed.untilWindowCloses(unbounded())) .toStream() {code} Exception thrown is: {code:java} Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier (org.apache.kafka.streams.kstream.internals.PassThrough and org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in unnamed module of loader 'app') {code} [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] > suppress() with cogroup() throws ClassCastException > --- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Wardha Perinkada Kattu >Priority: Blocker > Labels: kafka-streams > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( > .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown
[jira] [Created] (KAFKA-10417) suppress() with cogroup() throws ClassCastException
Wardha Perinkada Kattu created KAFKA-10417: -- Summary: suppress() with cogroup() throws ClassCastException Key: KAFKA-10417 URL: https://issues.apache.org/jira/browse/KAFKA-10417 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Wardha Perinkada Kattu Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` throws `ClassCastException` Works fine without the `suppress()` {code:java} val stream1 = requestStreams.merge(successStreams).merge(errorStreams) .groupByKey(Grouped.with(Serdes.String(), serdesConfig.notificationSerde())) val streams2 = confirmationStreams .groupByKey(Grouped.with(Serdes.String(), serdesConfig.confirmationsSerde())) val cogrouped = stream1.cogroup(notificationAggregator).cogroup(streams2, confirmationsAggregator) .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( .aggregate({ null }, Materialized.`as`>("time-windowed-aggregated-stream-store") .withValueSerde(serdesConfig.notificationMetricSerde())) .suppress(Suppressed.untilWindowCloses(unbounded())) .toStream() {code} Exception thrown is: {code:java} Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier (org.apache.kafka.streams.kstream.internals.PassThrough and org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in unnamed module of loader 'app') {code} [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179669#comment-17179669 ] Ning Zhang commented on KAFKA-10370: Hey [~ryannedolan] I updated the PR with the following code in `onPartitionsAssigned`. From my initial testing, it works well with `MirrorSinkTask`. Since the partition assignment is now driven by Consumer (as we use `consumer.subscribe()`), the `offsets` that passed into `context.offsets(offsets)` is all associated with consumer group, rather than letting `MirrorSinkTask` to do the consuming task assignment. Appreciate for your above thoughts and definitely expecting more feedback! https://github.com/apache/kafka/pull/9145/files#diff-9d27e74bcdc892150367aed9a4cf499eR617-R698 > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >
[GitHub] [kafka] vvcephei edited a comment on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
vvcephei edited a comment on pull request #9177: URL: https://github.com/apache/kafka/pull/9177#issuecomment-675522188 Looks like Jenkins shut down during the run last time or something. Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
vvcephei commented on pull request #9177: URL: https://github.com/apache/kafka/pull/9177#issuecomment-675522188 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
vvcephei commented on pull request #9177: URL: https://github.com/apache/kafka/pull/9177#issuecomment-675521788 Thanks for the update, @cadonna . Just one reply above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
vvcephei commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472250612 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } -public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { +public void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { +final MetricName metricName = metrics.metricName( +name, +STATE_STORE_LEVEL_GROUP, +description, +storeLevelTagMap(threadId, taskId, metricsScope, storeName) +); +if (metrics.metric(metricName) == null) { +final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); +final String key = storeSensorPrefix(threadId, taskId, storeName); +synchronized (storeLevelMetrics) { +metrics.addMetric(metricName, metricConfig, valueProvider); +storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); +} +} +} + +public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { +removeAllStoreLevelMetrics(threadId, taskId, storeName); +removeAllStoreLevelSensors(threadId, taskId, storeName); +} Review comment: It just seems oddly granular to synchronize them individually, since we always remove all of both collections together. If it doesn't matter, then do we need to synchronize at all? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing
vvcephei merged pull request #9174: URL: https://github.com/apache/kafka/pull/9174 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing
vvcephei commented on a change in pull request #9174: URL: https://github.com/apache/kafka/pull/9174#discussion_r472242890 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) { private Queue> getRecordsQueue(final String topicName) { final Queue> outputRecords = outputRecordsByTopic.get(topicName); -if (outputRecords == null) { -if (!processorTopology.sinkTopics().contains(topicName)) { -throw new IllegalArgumentException("Unknown topic: " + topicName); -} +if (outputRecords == null && !processorTopology.sinkTopics().contains(topicName)) { Review comment: Ok, let's just keep it in our back pocket for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10416) Provide an officially supported Deno client
Matthew T. Adams created KAFKA-10416: Summary: Provide an officially supported Deno client Key: KAFKA-10416 URL: https://issues.apache.org/jira/browse/KAFKA-10416 Project: Kafka Issue Type: New Feature Components: clients Reporter: Matthew T. Adams This is a similar request to https://issues.apache.org/jira/browse/KAFKA-10415 for a JavaScript client for Kafka, only packaged for [https://deno.land|https://deno.land/] . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10415) Provide an officially supported Node.js client
Matthew T. Adams created KAFKA-10415: Summary: Provide an officially supported Node.js client Key: KAFKA-10415 URL: https://issues.apache.org/jira/browse/KAFKA-10415 Project: Kafka Issue Type: New Feature Components: clients Reporter: Matthew T. Adams Please provide an official Node.js client for Kafka at feature parity with all of the other officially supported & provided Kafka clients. It is extremely confusing when it comes to trying to use Kafka in the Node.js ecosystem. There are many clients, some look legitimate ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out of date (many listed at [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), and others have confusing relationships among them ([https://github.com/nodefluent/node-sinek] & [https://github.com/nodefluent/kafka-streams]). Most of them are publicly asking for help. This leaves teams having to waste time trying to figure out which client has the Kafka features they need (mostly talking about streaming here), and which client has high quality and will be around in the future. If the client came directly from this project, those decisions would be made and we could get on about our work. JavaScript is on the of the most popular languages on the planet, and the Node.js user base is huge – big enough that a Node.js client provided directly by the Kafka team is justified. The list at [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] doesn't even mention what is perhaps the most confidence-inducing Node.js client thanks to its documentation, [https://kafka.js.org.|https://kafka.js.org./] The list at [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an officially-supported Go language client; Go's community is dwarfed by that of Node.js. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
[ https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban reassigned KAFKA-10414: Assignee: Daniel Urban > Upgrade api-util dependency - CVE-2018-1337 > --- > > Key: KAFKA-10414 > URL: https://issues.apache.org/jira/browse/KAFKA-10414 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > There is a dependency on org.apache.directory.api:api-util:1.0.0, which is > involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= > This is a transitive dependency through the apacheds libs. Can be fixed by > upgrading to at least version 2.0.0.AM25 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
Daniel Urban created KAFKA-10414: Summary: Upgrade api-util dependency - CVE-2018-1337 Key: KAFKA-10414 URL: https://issues.apache.org/jira/browse/KAFKA-10414 Project: Kafka Issue Type: Bug Reporter: Daniel Urban There is a dependency on org.apache.directory.api:api-util:1.0.0, which is involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= This is a transitive dependency through the apacheds libs. Can be fixed by upgrading to at least version 2.0.0.AM25 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC
cadonna commented on a change in pull request #9191: URL: https://github.com/apache/kafka/pull/9191#discussion_r472228811 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class MissingSourceTopicException extends StreamsException { Review comment: That is a good point. I thought all exceptions thrown from inside Streams (except for the `IllegalStateException`) should be `StreamsException`s. The `RecordDeserializer` throws a `StreamsException` when the deserialization exception handler -- which is user code -- throws any exception. Maybe @guozhangwang can help here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC
cadonna commented on a change in pull request #9191: URL: https://github.com/apache/kafka/pull/9191#discussion_r472217989 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class MissingSourceTopicException extends StreamsException { + +private final static long serialVersionUID = 1L; Review comment: That is a good question. I have to admit that I blindly copied it from another exception class. The field has to do with exceptions implementing the `Serializable` interface. This field tells the JVM whether the serialized object can be deserialized into an object of the class that it is available in the JVM. See https://stackoverflow.com/questions/7187302/what-is-serialversionuid-in-java-normally-in-exception-class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
cadonna commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472200360 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } -public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { +public void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge valueProvider) { +final MetricName metricName = metrics.metricName( +name, +STATE_STORE_LEVEL_GROUP, +description, +storeLevelTagMap(threadId, taskId, metricsScope, storeName) +); +if (metrics.metric(metricName) == null) { +final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); +final String key = storeSensorPrefix(threadId, taskId, storeName); +synchronized (storeLevelMetrics) { +metrics.addMetric(metricName, metricConfig, valueProvider); +storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); +} +} +} + +public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { +removeAllStoreLevelMetrics(threadId, taskId, storeName); +removeAllStoreLevelSensors(threadId, taskId, storeName); +} Review comment: Do you have performance concerns due to the two monitors? Or what is the main reason for using a single monitor here? By using a single monitor here and in `addStoreLevelMutableMetric()` and `storeLevelSensor()`, we do not ensure that no metrics are added to the metrics map during removal of all metrics because each time `Sensor#add()` is called a metric is added without synchronizing on the monitor of `storeLevelSensors`. Single operations on the metrics map are synchronized (through `ConcurrentMap`), but not multiple operations. ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java ## @@ -609,6 +611,37 @@ public void shouldVerifyThatMetricsGetMeasurementsFromRocksDB() { assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d)); } +@Test +public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() { +final TaskId taskId = new TaskId(0, 0); + +final Metrics metrics = new Metrics(new MetricConfig().recordLevel(RecordingLevel.INFO)); +final StreamsMetricsImpl streamsMetrics = +new StreamsMetricsImpl(metrics, "test-application", StreamsConfig.METRICS_LATEST, time); + +context = EasyMock.niceMock(InternalMockProcessorContext.class); +EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics); +EasyMock.expect(context.taskId()).andStubReturn(taskId); +EasyMock.expect(context.appConfigs()) +.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); +EasyMock.expect(context.stateDir()).andStubReturn(dir); +EasyMock.replay(context); + +rocksDBStore.init(context, rocksDBStore); +final byte[] key = "hello".getBytes(); +final byte[] value = "world".getBytes(); +rocksDBStore.put(Bytes.wrap(key), value); + +final Metric numberOfEntriesActiveMemTable = metrics.metric(new MetricName( +"num-entries-active-mem-table", +StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP, +"description is not verified", +streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), taskId.toString(), METRICS_SCOPE, DB_NAME) +)); +assertThat(numberOfEntriesActiveMemTable, notNullValue()); +assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), greaterThan(BigInteger.valueOf(0))); Review comment: Yes, but in this test I merely test whether the metric is updated. The correctness of the computation is verified in `RocksDBMetricsRecorderGaugesTest`. I will improve this test to
[GitHub] [kafka] rhauch commented on pull request #9176: Allow replace all for RegexRouter
rhauch commented on pull request #9176: URL: https://github.com/apache/kafka/pull/9176#issuecomment-675475664 @kkonstantine is correct that this changes the public API and therefore requires a KIP. Please also create a Jira issue, and change the title of this PR so that it starts with the issue (e.g., `KAFKA-12345: ` but with your issue number). This will auto-link this PR in the Jira issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability
[ https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179616#comment-17179616 ] Ranadeep Deb commented on KAFKA-8135: - I have been experiencing a similar issue in 2.12-2.3.0 I have a multi threaded application with each thread sending an individual message to the broker. There are instances where I have observed that the Producer threads get stuck on the Producer.send().get() call. I was not sure what was causing this issue but after landing on this thread I am suspecting that intermittent network outage might be the reason. I am curious about how to solve this. Thanks > Kafka Producer deadlocked on flush call with intermittent broker > unavailability > --- > > Key: KAFKA-8135 > URL: https://issues.apache.org/jira/browse/KAFKA-8135 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 >Reporter: Guozhang Wang >Assignee: Rajini Sivaram >Priority: Major > > In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, > and the value is default to 2 minutes. We've observed that when it was set to > MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the > {{broker.flush}} call would be blocked during the time when its destination > brokers are undergoing some unavailability: > {code} > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method) > - parking to wait for <0x0006aeb21a00> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown > Source) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown > Source) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown > Source) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown > Source) > at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown > Source) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > And even after the broker went back to normal, producers would still be > blocked. One suspicion is that when broker's not able to handle the request > in time, the responses are dropped somehow inside the Sender, and hence > whoever waiting on this response would be blocked forever. > We've observed such scenarios when 1) broker's transiently failed for a > while, 2) network partitioned transiently, and 3) broker's bad config like > ACL caused it to not be able to handle requests for a while. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
yazgoo created KAFKA-10413: -- Summary: rebalancing leads to unevenly balanced connectors Key: KAFKA-10413 URL: https://issues.apache.org/jira/browse/KAFKA-10413 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.5.1 Reporter: yazgoo Hi, With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if a connect instance disappear, or a new one appear, we're seeing unbalanced consumption, much like mentionned in this post: [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] This usually leads to one kafka connect instance taking most of the load and consumption not being able to keep on. Currently, we're "fixing" this by deleting the connector and re-creating it, but this is far from ideal. Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
cadonna commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472100951 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java ## @@ -56,6 +62,9 @@ public void maybeCloseStatistics() { } } +private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb."; +private static final ByteBuffer CONVERSION_BUFFER = ByteBuffer.allocate(Long.BYTES); Review comment: Good point! I missed that the gauge can be called by multiple metrics reporters concurrently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179535#comment-17179535 ] Bruno Cadonna commented on KAFKA-10357: --- bq. Can we leverage the committed offsets somehow? It seems like if the repartition topics don't exist but the group has committed offsets for them, then they must have been deleted If I understand it correctly, when a topic is deleted also its committed offsets are deleted. That means, the situation you describe is not possible. Please let me know, if I missed anything. bq. It seems preferable to me to have streams be able to detect when its internal state is invalid As far as I understand, that is exactly what we try to do. We want to detect the deletion of a repartition topic and notify the user about it through an exception. How the users react to the exception is their business. Admittedly, we need to provide some additional functionality to better react on such situations. I am not against a manual initialization step, but I have two concerns: 1) worse out-of-the-box experience because manual steps are required before you can play around with Streams 2) exposure of internals like the repartition topics To solve 1) we could introduce a config that tells Streams to assume that the internal topics (or just some of them) are pre-created and therefore not to setup them. To avoid the exposure of internal topics we could abstract the manual initialization to hide internals. However, what would then happen when a repartition topic is deleted? What should Streams do when it can assume that internal topics are pre-created and it does not find a repartition topic? Either it silently shuts down or it throws an exception on which users can react upon. I am in favor of the second. Instead to the manual initialization step, I would prefer a way to persist a flag that indicates that an automatic initialization was performed. If something unexpected happens the application could then be reset to a valid state with the application reset tool and the application reset tool would also reset the flag. But currently, I do not know where we could persist such a flag. Maybe somewhere on the brokers and let the flag be managed by the group coordinator? WDYT? > Handle accidental deletion of repartition-topics as exceptional failure > --- > > Key: KAFKA-10357 > URL: https://issues.apache.org/jira/browse/KAFKA-10357 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Repartition topics are both written by Stream's producer and read by Stream's > consumer, so when they are accidentally deleted both clients may be notified. > But in practice the consumer would react to it much quicker than producer > since the latter has a delivery timeout expiration period (see > https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to > it, it will re-join the group since metadata changed and during the triggered > rebalance it would auto-recreate the topic silently and continue, causing > data lost silently. > One idea, is to only create all repartition topics *once* in the first > rebalance and not auto-create them any more in future rebalances, instead it > would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code > (https://issues.apache.org/jira/browse/KAFKA-10355). > The challenge part would be, how to determine if it is the first-ever > rebalance, and there are several wild ideas I'd like to throw out here: > 1) change the thread state transition diagram so that STARTING state would > not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the > assign function we can check if the state is still in CREATED and not RUNNING. > 2) augment the subscriptionInfo to encode whether or not this is the first > time ever rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10412) Deprecate all setters of Headers
Chia-Ping Tsai created KAFKA-10412: -- Summary: Deprecate all setters of Headers Key: KAFKA-10412 URL: https://issues.apache.org/jira/browse/KAFKA-10412 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai _Headers is a part of public APIs but the usage of Headers is a bit chaos to users._ # it has a lot of abstract setters so users have to implement all of them. However, we never call user-defined setters in production. # it is not thread-safe so we have to call "setReadOnly" to make data consistency. # "setReadOnly" is not a part of public API so users have no idea about the "thread-safe" of Headers We can't improve Headers right now by reason of deprecation cycles. This KIP plans to deprecate and offer default implementation (_java_.lang.UnsupportedOperationException) to all setters of _Headers so we can cleanup all setters to make it be readonly in next major and users are able to remove all useless implementation from their_ _Headers in this patch._ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nizhikov opened a new pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov opened a new pull request #9196: URL: https://github.com/apache/kafka/pull/9196 For now, Kafka system tests use python2 which is outdated and not supported. This PR upgrades python to the third version. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
jeqo commented on pull request #9138: URL: https://github.com/apache/kafka/pull/9138#issuecomment-675384593 @ableegoldman, this PR is ready for review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #6669: KAFKA-8238: Adding Number of messages/bytes read
stanislavkozlovski commented on pull request #6669: URL: https://github.com/apache/kafka/pull/6669#issuecomment-675358064 Yeah, that'd be awesome! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Sasilekha commented on pull request #9195: KAFKA-10092 Remove unnecessary enum modifier in NioEchoServer
Sasilekha commented on pull request #9195: URL: https://github.com/apache/kafka/pull/9195#issuecomment-675358144 Hi @ notifygd Can you please review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Sasilekha opened a new pull request #9195: KAFKA-10092 Remove unnecessary enum modifier in NioEchoServer
Sasilekha opened a new pull request #9195: URL: https://github.com/apache/kafka/pull/9195 In NioEchoServer the enum has its constructor declared as private, which is redundant. We can remove this. public class NioEchoServer extends Thread { public enum MetricType { TOTAL, RATE, AVG, MAX; private final String metricNameSuffix; private MetricType() { metricNameSuffix = "-" + name().toLowerCase(Locale.ROOT); }}} Removed the MetricType constructor for the MetricType Enum This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function
chia7712 commented on pull request #9162: URL: https://github.com/apache/kafka/pull/9162#issuecomment-67533 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org