[GitHub] [kafka] omkreddy commented on pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-18 Thread GitBox


omkreddy commented on pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#issuecomment-675864751


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-18 Thread GitBox


lbradstreet commented on a change in pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#discussion_r472723313



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -185,10 +192,9 @@ public void close() {
 deleteAllTopics(backup.kafka());
 primary.stop();
 backup.stop();
+assertFalse(exited.get());

Review comment:
   Ah, fancy GitHub stuff. I pushed up the fix independently.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-18 Thread GitBox


lbradstreet commented on a change in pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#discussion_r472723414



##
File path: 
core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
##
@@ -29,14 +29,31 @@ import org.apache.kafka.clients.producer.{ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
+import org.apache.kafka.common.utils.Exit
+import org.junit.After
 import org.junit.Test
 import org.junit.Assert._
+import org.junit.Before
 
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
   override def generateConfigs: Seq[KafkaConfig] =
 TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, 
new Properties()))
 
+  val exited = new AtomicBoolean(false)
+
+  @Before
+  override def setUp(): Unit = {
+Exit.setExitProcedure((_, _) => exited.set(true))
+super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+super.tearDown()
+assertFalse(exited.get())

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-18 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-675859396


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-08-18 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-675858452


   @kkonstantine  , could you help review this PR to improve logging? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability

2020-08-18 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180258#comment-17180258
 ] 

Ismael Juma commented on KAFKA-8135:


[~rana.deb23] I suggest filing a new issue and please provide broker/client 
versions as well as a stacktrace from the producer when it's stuck.

> Kafka Producer deadlocked on flush call with intermittent broker 
> unavailability
> ---
>
> Key: KAFKA-8135
> URL: https://issues.apache.org/jira/browse/KAFKA-8135
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Guozhang Wang
>Assignee: Rajini Sivaram
>Priority: Major
>
> In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
> and the value is default to 2 minutes. We've observed that when it was set to 
> MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
> {{broker.flush}} call would be blocked during the time when its destination 
> brokers are undergoing some unavailability:
> {code}
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
> - parking to wait for  <0x0006aeb21a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
> Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
> Source)
> at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> And even after the broker went back to normal, producers would still be 
> blocked. One suspicion is that when broker's not able to handle the request 
> in time, the responses are dropped somehow inside the Sender, and hence 
> whoever waiting on this response would be blocked forever.
> We've observed such scenarios when 1) broker's transiently failed for a 
> while, 2) network partitioned transiently, and 3) broker's bad config like 
> ACL caused it to not be able to handle requests for a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-18 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-675851293


   There seems to be a Jenkins issue here:
   ```
   21:54:39 Fetching upstream changes from git://github.com/apache/kafka.git
   21:54:39  > git fetch --tags --progress -- git://github.com/apache/kafka.git 
+refs/heads/*:refs/remotes/origin/* +refs/pull/*:refs/remotes/origin/pr/*
   21:55:17 FATAL: java.io.IOException: Unexpected termination of the channel
   21:55:17 java.io.EOFException
   21:55:17 at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2681)
   21:55:17 at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
   21:55:17 at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
   21:55:17 at java.io.ObjectInputStream.(ObjectInputStream.java:358)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472693268



##
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##
@@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
   }
 
   @Test
-  def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
+  def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): 
Unit = {
+// when using --bootstrap-server, it should be illegal to alter anything 
that is not a quota and not a SCRAM credential
+// for both user and client entities

Review comment:
   Yeah, I think we should leave it alone for now, for the reasons you 
state.  But thanks for the context.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-18 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-675850067


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472688926



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   I thought about this more and I think I see a good way out of this 
difficulty.  We should just have an accessor method like `userInfo(String 
name)` that returns a `KafkaFuture`.  We can dynamically create this 
future if needed.  Then we can have a single RPC and a single API which is just 
`describe`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9708) Connector does not prefer to use packaged classes during configuration

2020-08-18 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9708.
---
Resolution: Fixed

> Connector does not prefer to use packaged classes during configuration
> --
>
> Key: KAFKA-9708
> URL: https://issues.apache.org/jira/browse/KAFKA-9708
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 2.6.0
>
>
> In connector tasks, classes loaded during configuration are preferentially 
> loaded from the PluginClassLoader since KAFKA-8819 was implemented. This same 
> prioritization is not currently respected in the connector itself, where the 
> delegating classloader is used as the context classloader. This leads to the 
> possibility for different versions of converters to be loaded, or different 
> versions of dependencies to be found when executing code in the connector vs 
> task.
> Worker::startConnector should be changed to follow the startTask / KAFKA-8819 
> prioritization scheme, by activating the PluginClassLoader earlier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9708) Connector does not prefer to use packaged classes during configuration

2020-08-18 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180249#comment-17180249
 ] 

Konstantine Karantasis commented on KAFKA-9708:
---

The PR was closed with a note that this was fixed by: 
[https://github.com/apache/kafka/pull/8069] and as part of 
https://issues.apache.org/jira/browse/KAFKA-9374

Closing this issue as fixed. 

> Connector does not prefer to use packaged classes during configuration
> --
>
> Key: KAFKA-9708
> URL: https://issues.apache.org/jira/browse/KAFKA-9708
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> In connector tasks, classes loaded during configuration are preferentially 
> loaded from the PluginClassLoader since KAFKA-8819 was implemented. This same 
> prioritization is not currently respected in the connector itself, where the 
> delegating classloader is used as the context classloader. This leads to the 
> possibility for different versions of converters to be loaded, or different 
> versions of dependencies to be found when executing code in the connector vs 
> task.
> Worker::startConnector should be changed to follow the startTask / KAFKA-8819 
> prioritization scheme, by activating the PluginClassLoader earlier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9708) Connector does not prefer to use packaged classes during configuration

2020-08-18 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9708:
--
Fix Version/s: 2.6.0

> Connector does not prefer to use packaged classes during configuration
> --
>
> Key: KAFKA-9708
> URL: https://issues.apache.org/jira/browse/KAFKA-9708
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 2.6.0
>
>
> In connector tasks, classes loaded during configuration are preferentially 
> loaded from the PluginClassLoader since KAFKA-8819 was implemented. This same 
> prioritization is not currently respected in the connector itself, where the 
> delegating classloader is used as the context classloader. This leads to the 
> possibility for different versions of converters to be loaded, or different 
> versions of dependencies to be found when executing code in the connector vs 
> task.
> Worker::startConnector should be changed to follow the startTask / KAFKA-8819 
> prioritization scheme, by activating the PluginClassLoader earlier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9200:
URL: https://github.com/apache/kafka/pull/9200#discussion_r472636924



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -185,10 +192,9 @@ public void close() {
 deleteAllTopics(backup.kafka());
 primary.stop();
 backup.stop();
+assertFalse(exited.get());

Review comment:
   ```suggestion
   assertFalse(exited.get());
   Exit.resetExitProcedure();
   ```

##
File path: 
core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
##
@@ -29,14 +29,31 @@ import org.apache.kafka.clients.producer.{ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
+import org.apache.kafka.common.utils.Exit
+import org.junit.After
 import org.junit.Test
 import org.junit.Assert._
+import org.junit.Before
 
 class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
 
   override def generateConfigs: Seq[KafkaConfig] =
 TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_, 
new Properties()))
 
+  val exited = new AtomicBoolean(false)
+
+  @Before
+  override def setUp(): Unit = {
+Exit.setExitProcedure((_, _) => exited.set(true))
+super.setUp()
+  }
+
+  @After
+  override def tearDown(): Unit = {
+super.tearDown()
+assertFalse(exited.get())

Review comment:
   ```suggestion
   assertFalse(exited.get())
   Exit.resetExitProcedure()
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-08-18 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-675828534


   @abbccdda @omkreddy , could you review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-18 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-675828368


   @mjsax , could you review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-18 Thread GitBox


LMnet commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-675817076


   Voting for the KIP successfully finished and this pull request could be 
merged now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472575326



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   I can see reasonable arguments for all the possibilities, so if you feel 
strongly about one of them then I would be fine with it.  For example, `list()` 
and `describe()` even though they return the same thing (now -- `describe()` 
could return more later potentially).  Just let me know.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   > a list RPC to show everything, and a describe RPC to show only some 
things.
   
   Do you mean a list RPC that takes no arguments and returns every credential 
defined for every user and a describe RPC that takes 1 or more users and 
returns every credential defined for those specified users, and they both 
return the same information for each credential?
   
   Or do you mean a list RPC and a describe RPC that return different sets of 
information (as is done with list vs. describe topics)?  I think you mean the 
former (two RPCs, each returning the same thing), but I want to be certain I 
understand.
   
   > There are a few reasons why. One is that even though we currently only 
make one RPC, in the future we might make more than one. In that case we would 
want multiple futures.
   
   I don't understand what this is referring to.  By "we currently only make 
one RPC" to what are you referring?
   
   > I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time
   
   Agreed.  Will convert to using futures always, whenever we arrive at the 
final decision on what the RPCs need to look like.
   
   I'm wondering if we convert to returning futures everywhere, can we stick 
with the one describe RPC?  For example, could the admin client return a 
`Future>>`?  Would that 
work, and if so, would that be a reasonable way to proceed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472574569



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   If we do decide to go with 2 separate APIs, then I might be concerned 
about using `list()` vs `describe()` if they return the same set of information 
(i.e. mechanism and iterations).  Perhaps using two separate names gives us 
room to expand `describe()` to return more information later on, though.  But 
if not, and they will always return the same information, then maybe 
`describeAll()` and `describe()` (or `listAll()` and `list()`) might be better?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9198: HOTFIX: use Exit.exit instead of System.exit

2020-08-18 Thread GitBox


mjsax merged pull request #9198:
URL: https://github.com/apache/kafka/pull/9198


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   > a list RPC to show everything, and a describe RPC to show only some 
things.
   
   Do you mean a list RPC that takes no arguments and returns every credential 
defined for every users and a describe RPC that takes 1 or more users and 
returns every credential defined for those specified users, and they both 
return the same information for each credential?
   
   Or do you mean a list RPC and a describe RPC that return different sets of 
information (as is done with list vs. describe topics)?  I think you mean the 
former (two RPCs, each returning the same thing), but I want to be certain I 
understand.
   
   > There are a few reasons why. One is that even though we currently only 
make one RPC, in the future we might make more than one. In that case we would 
want multiple futures.
   
   I don't understand what this is referring to.  By "we currently only make 
one RPC" to what are you referring?
   
   > I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time
   
   Agreed.  Will convert to using futures always, whenever we arrive at the 
final decision on what the RPCs need to look like.
   
   I'm wondering if we convert to returning futures everywhere, can we stick 
with the one describe RPC?  For example, could the admin client return a 
`Future>>`?  Would that 
work, and if so, would that be a reasonable way to proceed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472570823



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   > a list RPC to show everything, and a describe RPC to show only some 
things.
   
   Do you mean a list RPC that takes no arguments and returns every credential 
defined for every users and a describe RPC that takes 1 or more users and 
returns every credential defined for those specified users, and they both 
return the same information for each credential?
   
   Or do you mean a list RPC and a describe RPC that return different sets of 
information (as is done with list vs. describe topics)?  I think you mean the 
former (two RPCs, each returning the same thing), but I want to be certain I 
understand.
   
   > There are a few reasons why. One is that even though we currently only 
make one RPC, in the future we might make more than one. In that case we would 
want multiple futures.
   
   I don't understand what this is referring to.  By "we currently only make 
one RPC" to what are you referring?
   
   > I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time
   
   Agreed.  Will convert to using futures always, whenever we arrive at the 
final decision on what the RPCs need to look like.
   
   I'm wondering if we convert to returning futures everywhere, can we stick 
with the one describe RPC?  For example, could the admin client return a 
`Future>>`?  Would that work, and if so, would that be 
a reasonable way to proceed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet opened a new pull request #9200: MINOR: mirror integration tests should not call System.exit

2020-08-18 Thread GitBox


lbradstreet opened a new pull request #9200:
URL: https://github.com/apache/kafka/pull/9200


   406635bcc9f2a4c439d198ea0549170de331323c switched System.exit to using 
Exit.exit, however the integration tests did not choose to override the exit 
procedure.
   
   This can cause invalid test runs with errors like this:
   ```
   [2020-08-18T15:52:01.142Z] * What went wrong:
   [2020-08-18T15:52:01.142Z] Execution failed for task 
':connect:mirror:integrationTest'.
   [2020-08-18T15:52:01.142Z] > Process 'Gradle Test Executor 192' finished 
with non-zero exit value 1
   [2020-08-18T15:52:01.142Z]   This problem might be caused by incorrect test 
process configuration.
   [2020-08-18T15:52:01.142Z]   Please refer to the test execution section in 
the User Manual at 
https://docs.gradle.org/6.6/userguide/java_testing.html#sec:test_execution
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

2020-08-18 Thread GitBox


vvcephei commented on a change in pull request #9137:
URL: https://github.com/apache/kafka/pull/9137#discussion_r472531237



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.rocksdb.RocksIterator;
-
-import java.util.Set;
-
-class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
   Was this class unused or something?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##
@@ -249,7 +248,23 @@ public void putAll(final List> 
entries) {
 validateStoreOpen();
 final KeyValueIterator storeIterator = 
wrapped().range(from, to);
 final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
context.cache().range(cacheName, from, to);
-return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator);
+return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, false);
+}
+
+@Override
+public KeyValueIterator reverseRange(final Bytes from,
+final Bytes to) {
+if (from.compareTo(to) > 0) {
+LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. "
++ "This may be due to serdes that don't preserve ordering when 
lexicographically comparing the serialized bytes. " +
+"Note that the built-in numerical serdes do not follow this 
for negative numbers");

Review comment:
   This warning seems to miss the most likely scenario, that the user just 
passed the arguments in the wrong order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9198: HOTFIX: use Exit.exit instead of System.exit

2020-08-18 Thread GitBox


cmccabe commented on pull request #9198:
URL: https://github.com/apache/kafka/pull/9198#issuecomment-675755685







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472530253



##
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##
@@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
   }
 
   @Test
-  def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
+  def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): 
Unit = {
+// when using --bootstrap-server, it should be illegal to alter anything 
that is not a quota and not a SCRAM credential
+// for both user and client entities

Review comment:
   @cmccabe Good question, actually.  There is already a check to make sure 
a non-existent config cannot be **deleted** via `--zookeeper`: 
`shouldNotUpdateConfigIfNonExistingConfigIsDeletedUsingZookeper()`.  This test 
passes, of course.
   
   However, there is no check to make sure an unrecognized config can be 
**added**, and in fact if I add that test it fails; the code will gladly go 
ahead and add anything we wish (and it will gladly go ahead and delete it if we 
wish as well -- the above test is only checking that something that doesn't 
exist can't be deleted).
   
   The next question, of course, is whether we should "fix" this or not.  What 
do you think?  To fix it we would need the full set of allowed configs at the 
User, Client, Topic, and Broker levels and then insert code to check 
accordingly.  Since the ZooKeeper update path is going away due to KIP-500, I'm 
wondering if we can just leave it alone?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text

2020-08-18 Thread GitBox


cmccabe commented on pull request #9199:
URL: https://github.com/apache/kafka/pull/9199#issuecomment-675734800


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472507947



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   Thinking about this more, I would prefer having both a list RPC to show 
everything, and a describe RPC to show only some things.  There are a few 
reasons why.  One is that even though we currently only make one RPC, in the 
future we might make more than one.  In that case we would want multiple 
futures.
   
   Another is that the general pattern in Kafka is that list RPCs show 
everything, but describe RPCs show only some things.  It's true that there are 
some places where we violate this pattern, but I still think it's worth trying 
to follow where we can.  Maybe this should be documented better so that when we 
add new RPCs, people aren't confused about whether to use "list" or "describe."
   
   I also feel like in AdminClient, errors should be handled with futures 
pretty much all the time, unless there is a really strong reason not to.  This 
allows people to use an async style of programming.  In contrast, mixing in 
some errors that aren't futures, but need to be checked explicitly is very 
likely to confuse people.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-08-18 Thread GitBox


mjsax commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-675733090


   Seems I made a mistake cherry-picking to `2.6` -- I remember that there was 
a conflict; guess I resolved it incorrectly. Did a HOTFIX PR: #9198



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text

2020-08-18 Thread GitBox


rondagostino commented on pull request #9199:
URL: https://github.com/apache/kafka/pull/9199#issuecomment-675731352


   @cmccabe Can you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #9199: KAFKA-10418: alter topic configs via kafka-topics error text

2020-08-18 Thread GitBox


rondagostino opened a new pull request #9199:
URL: https://github.com/apache/kafka/pull/9199


   Changing a topic config with the `kafka-topics` command while connecting to 
Kafka via `--bootstrap-server` (rather than connecting to ZooKeeper via 
`--zookeeper`) is not supported. The desired functionality is available 
elsewhere, though: it is possible to change a topic config while connecting to 
Kafka rather than ZooKeeper via the `kafka-configs` command instead. However, 
neither the `kafka-topics` error message received nor the `kafka-topics` help 
information itself indicates this other possibility. For example:
   
   ```
   $ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
--config flush.messages=12345
   Option combination "[bootstrap-server],[config]" can't be used with option 
"[alter]"
   ```
   
   ```
   $ kafka-topics.sh
   ...
   --config  A topic configuration override for the topic 
being created or altered...It is supported only in combination with – create if 
--bootstrap-server option is used.
   ```
   
   Rather than simply saying that what you want to do isn't available, it would 
be better to say also that you can do it with the `kafka-configs` command.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #9198: HOTFIX: use Exit.exit instead of System.exit

2020-08-18 Thread GitBox


mjsax opened a new pull request #9198:
URL: https://github.com/apache/kafka/pull/9198


   Follow up to #9067 -- seems I made an error resolving the unclean 
cherry-pick from `trunk` to `2.6`.
   
   Call for review @vvcephei @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472501147



##
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##
@@ -486,7 +486,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
   }
 
   @Test
-  def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = {
+  def shouldNotAlterNonQuotaNonScramUserOrClientConfigUsingBootstrapServer(): 
Unit = {
+// when using --bootstrap-server, it should be illegal to alter anything 
that is not a quota and not a SCRAM credential
+// for both user and client entities

Review comment:
   sorry, this might be a silly question, but how are these constraints 
different when using --zookeeper?  should we test that as well





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472498809



##
File path: 
clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
##
@@ -26,11 +26,18 @@
 public void testDelayedAllocationSchemaDetection() throws Exception {
 //verifies that schemas known to retain a reference to the underlying 
byte buffer are correctly detected.
 for (ApiKeys key : ApiKeys.values()) {
-if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == 
ApiKeys.SYNC_GROUP || key == ApiKeys.SASL_AUTHENTICATE
-|| key == ApiKeys.EXPIRE_DELEGATION_TOKEN || key == 
ApiKeys.RENEW_DELEGATION_TOKEN) {
-assertTrue(key + " should require delayed allocation", 
key.requiresDelayedAllocation);
-} else {
-assertFalse(key + " should not require delayed allocation", 
key.requiresDelayedAllocation);
+switch (key) {
+case PRODUCE:
+case JOIN_GROUP:
+case SYNC_GROUP:
+case SASL_AUTHENTICATE:
+case EXPIRE_DELEGATION_TOKEN:
+case RENEW_DELEGATION_TOKEN:
+case ALTER_USER_SCRAM_CREDENTIALS:
+assertTrue(key + " should require delayed allocation", 
key.requiresDelayedAllocation);
+break;
+default:
+assertFalse(key + " should not require delayed 
allocation", key.requiresDelayedAllocation);

Review comment:
   I know it's not strictly necessary, but it would be nice to have a 
"break" after the default clause too





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472496932



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Representation of all SASL/SCRAM credentials associated with a user that 
can be retrieved, or an exception indicating
+ * why credentials could not be retrieved.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API;>KIP-554:
 Add Broker-side SCRAM Config API
+ */
+public class UserScramCredentialsDescription {
+private final String name;
+private final Optional exception;
+private final List credentialInfos;
+
+/**
+ * Constructor for when SASL/SCRAM credentials associated with a user 
could not be retrieved
+ *
+ * @param name the required user name
+ * @param exception the required exception indicating why the credentials 
for the user could not be retrieved
+ */
+public UserScramCredentialsDescription(String name, ApiException 
exception) {
+this(name, Optional.of(Objects.requireNonNull(exception)), 
Collections.emptyList());
+}
+
+/**
+ * Constructor for when SASL/SCRAM credentials associated with a user are 
successfully retrieved
+ *
+ * @param name the required user name
+ * @param credentialInfos the required SASL/SCRAM credential 
representations for the user
+ */
+public UserScramCredentialsDescription(String name, 
List credentialInfos) {
+this(name, Optional.empty(), Objects.requireNonNull(credentialInfos));
+}
+
+private UserScramCredentialsDescription(String name, 
Optional exception, List credentialInfos) {
+this.name = Objects.requireNonNull(name);
+this.exception = Objects.requireNonNull(exception);
+this.credentialInfos = Collections.unmodifiableList(new 
ArrayList<>(credentialInfos));
+}
+
+/**
+ *
+ * @return the user name
+ */
+public String name() {
+return name;
+}
+
+/**
+ *
+ * @return the exception, if any, that prevented the user's SASL/SCRAM 
credentials from being retrieved
+ */
+public Optional exception() {
+return exception;
+}
+
+/**
+ *
+ * @return the always non-null/unmodifiable list of SASL/SCRAM credential 
representations for the user
+ * (empty if {@link #exception} defines an exception)
+ */
+public List credentialInfos() {

Review comment:
   It would be good to throw the exception here if there is one, so that it 
wasn't possible to ignore the problem





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r472494891



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describe all SASL/SCRAM credentials.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @return The DescribeUserScramCredentialsResult.
+ */
+default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+}
+
+/**
+ * Describe SASL/SCRAM credentials for the given users.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+ *  or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+ *  in the results.

Review comment:
   @rondagostino : It still says "A user explicitly specified here that 
does not have a SCRAM credential will not appear in the results".  I thought we 
agreed that such a user would get an error code?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8050: MINOR: Include call name in TimeoutException

2020-08-18 Thread GitBox


mimaison commented on pull request #8050:
URL: https://github.com/apache/kafka/pull/8050#issuecomment-675714697


   @omkreddy @hachikuji Can you take a look? Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9194: KAFKA-10384: Separate converters from generated messages

2020-08-18 Thread GitBox


cmccabe commented on pull request #9194:
URL: https://github.com/apache/kafka/pull/9194#issuecomment-675714307


   JDK8 test failure looks like a Jenkins issue.
   
   ```
   13:29:01 Fetching upstream changes from git://github.com/apache/kafka.git
   13:29:01  > git fetch --tags --progress -- git://github.com/apache/kafka.git 
+refs/heads/*:refs/remotes/origin/* +refs/pull/*:refs/remotes/origin/pr/*
   13:29:17 FATAL: java.io.IOException: Unexpected termination of the channel
   13:29:17 java.io.EOFException
   13:29:17 at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2681)
   13:29:17 at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

2020-08-18 Thread GitBox


mimaison commented on pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#issuecomment-675713485


   @mjsax Can you take another look? Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-18 Thread GitBox


cmccabe commented on pull request #9197:
URL: https://github.com/apache/kafka/pull/9197#issuecomment-675711065


   Looks like there is a checkstyle issue upstream:
   ```
   12:59:42 > Task :streams:upgrade-system-tests-22:checkstyleTest FAILED
   12:59:42 [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13@2/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java:43:
 Line matches the illegal pattern ''System.exit': Should not directly call 
System.exit, but Exit.exit instead.'. [dontUseSystemExit]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-18 Thread GitBox


cmccabe merged pull request #9144:
URL: https://github.com/apache/kafka/pull/9144


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-18 Thread GitBox


cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r472477998



##
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##
@@ -189,16 +189,18 @@ ClientRequest newClientRequest(String nodeId, 
AbstractRequest.Builder request
  * cancelling the request. The request may get 
cancelled sooner if the socket disconnects
  * for any reason including if another pending 
request to the same node timed out first.
  * @param callback the callback to invoke when we get a response
+ * @param initialPrincipalName the initial client principal name, when 
building a forward request
+ * @param initialClientId the initial client id, when building a forward 
request
  */
 ClientRequest newClientRequest(String nodeId,
AbstractRequest.Builder requestBuilder,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
+   String initialPrincipalName,

Review comment:
   Fair enough





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-18 Thread GitBox


cmccabe commented on pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#issuecomment-675703162


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-08-18 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-675697535


   @abbccdda Thanks for the reviews, can you take another look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10418) Unclear error/docs when altering topic configs via kafka-topics with --bootstrap-server

2020-08-18 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-10418:
--
Summary: Unclear error/docs when altering topic configs via kafka-topics 
with --bootstrap-server  (was: Unclear deprecation of altering topic configs 
via kafka-topics with --bootstrap-server)

> Unclear error/docs when altering topic configs via kafka-topics with 
> --bootstrap-server
> ---
>
> Key: KAFKA-10418
> URL: https://issues.apache.org/jira/browse/KAFKA-10418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
> Fix For: 2.7.0
>
>
> Changing a topic config with the kafka-topics command while connecting to 
> Kafka via --bootstrap-server (rather than connecting to ZooKeeper via 
> --zookeeper) has been deprecated.  The desired functionality is available 
> elsewhere: it is possible to change a topic config while connecting to Kafka 
> rather than ZooKeeper via the kafka-configs command instead.  However, 
> neither the kafka-topics error message received nor the kafka-topics help 
> information itself indicates this other possibility.  For example:
> {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
> --config flush.messages=12345
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"
> }}
> {{$ kafka-topics.sh
> ...
> --config A topic configuration override for 
> the topic being created or altered...It is supported only in combination with 
> -- create if --bootstrap-server option is used.
> }}
> Rather than simply saying that what you want to do isn't available, it would 
> be better to say also that you can do it with the kafka-configs command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10418) Unclear error/docs when altering topic configs via kafka-topics with --bootstrap-server

2020-08-18 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-10418:
--
Description: 
Changing a topic config with the kafka-topics command while connecting to Kafka 
via --bootstrap-server (rather than connecting to ZooKeeper via --zookeeper) is 
not supported.  The desired functionality is available elsewhere, though: it is 
possible to change a topic config while connecting to Kafka rather than 
ZooKeeper via the kafka-configs command instead.  However, neither the 
kafka-topics error message received nor the kafka-topics help information 
itself indicates this other possibility.  For example:

{{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
--config flush.messages=12345
Option combination "[bootstrap-server],[config]" can't be used with option 
"[alter]"
}}

{{$ kafka-topics.sh
...
--config A topic configuration override for the 
topic being created or altered...It is supported only in combination with -- 
create if --bootstrap-server option is used.
}}

Rather than simply saying that what you want to do isn't available, it would be 
better to say also that you can do it with the kafka-configs command.


  was:
Changing a topic config with the kafka-topics command while connecting to Kafka 
via --bootstrap-server (rather than connecting to ZooKeeper via --zookeeper) 
has been deprecated.  The desired functionality is available elsewhere: it is 
possible to change a topic config while connecting to Kafka rather than 
ZooKeeper via the kafka-configs command instead.  However, neither the 
kafka-topics error message received nor the kafka-topics help information 
itself indicates this other possibility.  For example:

{{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
--config flush.messages=12345
Option combination "[bootstrap-server],[config]" can't be used with option 
"[alter]"
}}

{{$ kafka-topics.sh
...
--config A topic configuration override for the 
topic being created or altered...It is supported only in combination with -- 
create if --bootstrap-server option is used.
}}

Rather than simply saying that what you want to do isn't available, it would be 
better to say also that you can do it with the kafka-configs command.



> Unclear error/docs when altering topic configs via kafka-topics with 
> --bootstrap-server
> ---
>
> Key: KAFKA-10418
> URL: https://issues.apache.org/jira/browse/KAFKA-10418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
> Fix For: 2.7.0
>
>
> Changing a topic config with the kafka-topics command while connecting to 
> Kafka via --bootstrap-server (rather than connecting to ZooKeeper via 
> --zookeeper) is not supported.  The desired functionality is available 
> elsewhere, though: it is possible to change a topic config while connecting 
> to Kafka rather than ZooKeeper via the kafka-configs command instead.  
> However, neither the kafka-topics error message received nor the kafka-topics 
> help information itself indicates this other possibility.  For example:
> {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
> --config flush.messages=12345
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"
> }}
> {{$ kafka-topics.sh
> ...
> --config A topic configuration override for 
> the topic being created or altered...It is supported only in combination with 
> -- create if --bootstrap-server option is used.
> }}
> Rather than simply saying that what you want to do isn't available, it would 
> be better to say also that you can do it with the kafka-configs command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10418) Incomplete error/docs when altering topic configs via kafka-topics with --bootstrap-server

2020-08-18 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-10418:
--
Summary: Incomplete error/docs when altering topic configs via kafka-topics 
with --bootstrap-server  (was: Unclear error/docs when altering topic configs 
via kafka-topics with --bootstrap-server)

> Incomplete error/docs when altering topic configs via kafka-topics with 
> --bootstrap-server
> --
>
> Key: KAFKA-10418
> URL: https://issues.apache.org/jira/browse/KAFKA-10418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Minor
> Fix For: 2.7.0
>
>
> Changing a topic config with the kafka-topics command while connecting to 
> Kafka via --bootstrap-server (rather than connecting to ZooKeeper via 
> --zookeeper) is not supported.  The desired functionality is available 
> elsewhere, though: it is possible to change a topic config while connecting 
> to Kafka rather than ZooKeeper via the kafka-configs command instead.  
> However, neither the kafka-topics error message received nor the kafka-topics 
> help information itself indicates this other possibility.  For example:
> {{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
> --config flush.messages=12345
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"
> }}
> {{$ kafka-topics.sh
> ...
> --config A topic configuration override for 
> the topic being created or altered...It is supported only in combination with 
> -- create if --bootstrap-server option is used.
> }}
> Rather than simply saying that what you want to do isn't available, it would 
> be better to say also that you can do it with the kafka-configs command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10419) KAFKA BROKER Shuts down when a topic is deleted manually from command line on Windows 1) operating System.

2020-08-18 Thread Ajay Kapoor (Jira)
Ajay Kapoor created KAFKA-10419:
---

 Summary: KAFKA BROKER Shuts down when a topic is deleted manually 
from command line on Windows 1) operating System.
 Key: KAFKA-10419
 URL: https://issues.apache.org/jira/browse/KAFKA-10419
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.6.0
 Environment: WIndows 10 Operating System
Reporter: Ajay Kapoor


KAFKA VERSION: KAFKA_2.13-2.6.0

Delete of topic on Windows causes kafka broker shutdown:

[2020-08-18 15:18:22,858] INFO [ReplicaAlterLogDirsManager on broker 0] Removed 
fetcher for partitions Set(quickstart-events-0) 
(kafka.server.ReplicaAlterLogDirsManager)

[2020-08-18 15:18:22,899] ERROR Error while renaming dir for 
quickstart-events-0 in log dir C:\tmp\kafka-logs 
(kafka.server.LogDirFailureChannel)

java.nio.file.AccessDeniedException: C:\tmp\kafka-logs\quickstart-events-0 -> 
C:\tmp\kafka-logs\quickstart-events-0.d767af7933ae4fe087c212994ef02e90-delete

    at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)

    at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)

    at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)

    at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)

    at java.base/java.nio.file.Files.move(Files.java:1425)

    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:913)

    at kafka.log.Log.$anonfun$renameDir$2(Log.scala:981)

    at kafka.log.Log.renameDir(Log.scala:2340)

    at kafka.log.LogManager.asyncDelete(LogManager.scala:935)

    at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:470)

    at kafka.cluster.Partition.delete(Partition.scala:461)

    at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:344)

    at 
kafka.server.ReplicaManager.$anonfun$stopReplicas$9(ReplicaManager.scala:448)

    at scala.collection.mutable.HashMap$Node.foreach(HashMap.scala:587)

    at scala.collection.mutable.HashMap.foreach(HashMap.scala:475)

    at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:445)

    at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:252)

    at kafka.server.KafkaApis.handle(KafkaApis.scala:137)

    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)

    at java.base/java.lang.Thread.run(Thread.java:830)

    Suppressed: java.nio.file.AccessDeniedException: 
C:\tmp\kafka-logs\quickstart-events-0 -> 
C:\tmp\kafka-logs\quickstart-events-0.d767af7933ae4fe087c212994ef02e90-delete

    at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89)

    at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)

    at 
java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)

    at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)

    at java.base/java.nio.file.Files.move(Files.java:1425)

    at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:910)

    ... 14 more

[2020-08-18 15:18:22,914] WARN [ReplicaManager broker=0] Stopping serving 
replicas in dir C:\tmp\kafka-logs (kafka.server.ReplicaManager)

[2020-08-18 15:18:22,930] WARN [ReplicaManager broker=0] Broker 0 stopped 
fetcher for partitions  and stopped moving logs for partitions  because they 
are in the failed log directory C:\tmp\kafka-logs. (kafka.server.ReplicaManager)

[2020-08-18 15:18:22,932] WARN Stopping serving logs in dir C:\tmp\kafka-logs 
(kafka.log.LogManager)

[2020-08-18 15:18:22,946] ERROR Shutdown broker because all log dirs in 
C:\tmp\kafka-logs 
 have failed (kafka.log.LogManager)

How to reproduce::
1. Start Zookeeper on windows
>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
2. Start Kafka Broker on Windows
>bin\windows\kafka-server-start.bat config\server.properties
3. Create a Topic on Kafka Broker
>bin\windows\kafka-topics.bat --create --topic quickstart-events 
>--bootstrap-server localhost:9092
4. Delete the Kafka Topic created above.
>bin\windows\kafka-topics.bat --delete --topic quickstart-events 
>--bootstrap-server localhost:9092





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10418) Unclear deprecation of altering topic configs via kafka-topics with --bootstrap-server

2020-08-18 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-10418:
-

 Summary: Unclear deprecation of altering topic configs via 
kafka-topics with --bootstrap-server
 Key: KAFKA-10418
 URL: https://issues.apache.org/jira/browse/KAFKA-10418
 Project: Kafka
  Issue Type: Improvement
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 2.7.0


Changing a topic config with the kafka-topics command while connecting to Kafka 
via --bootstrap-server (rather than connecting to ZooKeeper via --zookeeper) 
has been deprecated.  The desired functionality is available elsewhere: it is 
possible to change a topic config while connecting to Kafka rather than 
ZooKeeper via the kafka-configs command instead.  However, neither the 
kafka-topics error message received nor the kafka-topics help information 
itself indicates this other possibility.  For example:

{{$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test 
--config flush.messages=12345
Option combination "[bootstrap-server],[config]" can't be used with option 
"[alter]"
}}

{{$ kafka-topics.sh
...
--config A topic configuration override for the 
topic being created or altered...It is supported only in combination with -- 
create if --bootstrap-server option is used.
}}

Rather than simply saying that what you want to do isn't available, it would be 
better to say also that you can do it with the kafka-configs command.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-18 Thread GitBox


ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r472441802



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+private static final String TOPIC = "input";
+private final StreamsBuilder builder = new StreamsBuilder();
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private TimeWindowedKStream windowedStream;
+
+@Before
+public void before() {
+final KStream stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+windowedStream = stream.
+groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+}
+
+@Test
+public void shouldCountSlidingWindows() {
+final MockProcessorSupplier, Long> supplier = new 
MockProcessorSupplier<>();
+windowedStream
+.count()
+.toStream()
+.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+processData(driver);
+}
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+equalTo(ValueAndTimestamp.make(1L, 100L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+equalTo(ValueAndTimestamp.make(1L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+equalTo(ValueAndTimestamp.make(2L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+equalTo(ValueAndTimestamp.make(1L, 500L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+equalTo(ValueAndTimestamp.make(2L, 200L)));
+assertThat(
+   

[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-18 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r472440045



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.java
##
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+public class SlidingWindowedKStreamImplTest {
+
+private static final String TOPIC = "input";
+private final StreamsBuilder builder = new StreamsBuilder();
+private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private TimeWindowedKStream windowedStream;
+
+@Before
+public void before() {
+final KStream stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
+windowedStream = stream.
+groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(1000L)));
+}
+
+@Test
+public void shouldCountSlidingWindows() {
+final MockProcessorSupplier, Long> supplier = new 
MockProcessorSupplier<>();
+windowedStream
+.count()
+.toStream()
+.process(supplier);
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+processData(driver);
+}
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(0L, 100L))),
+equalTo(ValueAndTimestamp.make(1L, 100L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(101L, 201L))),
+equalTo(ValueAndTimestamp.make(1L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(50L, 150L))),
+equalTo(ValueAndTimestamp.make(2L, 150L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("1", new TimeWindow(400L, 500L))),
+equalTo(ValueAndTimestamp.make(1L, 500L)));
+assertThat(
+supplier.theCapturedProcessor().lastValueAndTimestampPerKey
+.get(new Windowed<>("2", new TimeWindow(100L, 200L))),
+equalTo(ValueAndTimestamp.make(2L, 200L)));
+assertThat(
+

[GitHub] [kafka] cmccabe opened a new pull request #9197: Revert KAFKA-9309: Add the ability to translate Message to JSON

2020-08-18 Thread GitBox


cmccabe opened a new pull request #9197:
URL: https://github.com/apache/kafka/pull/9197


   This reverts commit bf6dffe93bbe0fe33ad076ebccebb840d66b936d



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10415) Provide an officially supported Node.js client

2020-08-18 Thread Matthew T. Adams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17180036#comment-17180036
 ] 

Matthew T. Adams commented on KAFKA-10415:
--

[~ijuma] I understand.  Should I have filed an issue with Confluent instead, 
then?  If so, can you point me to an issue tracker for Confluent where I could 
file this?

> Provide an officially supported Node.js client
> --
>
> Key: KAFKA-10415
> URL: https://issues.apache.org/jira/browse/KAFKA-10415
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Matthew T. Adams
>Priority: Major
>
> Please provide an official Node.js client for Kafka at feature parity with 
> all of the other officially supported & provided Kafka clients.
> It is extremely confusing when it comes to trying to use Kafka in the Node.js 
> ecosystem.  There are many clients, some look legitimate 
> ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out 
> of date (many listed at 
> [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), 
> and others have confusing relationships among them 
> ([https://github.com/nodefluent/node-sinek] & 
> [https://github.com/nodefluent/kafka-streams]).  Most of them are publicly 
> asking for help.  This leaves teams having to waste time trying to figure out 
> which client has the Kafka features they need (mostly talking about streaming 
> here), and which client has high quality and will be around in the future.  
> If the client came directly from this project, those decisions would be made 
> and we could get on about our work.
> JavaScript is on the of the most popular languages on the planet, and the 
> Node.js user base is huge – big enough that a Node.js client provided 
> directly by the Kafka team is justified.  The list at 
> [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] 
> doesn't even mention what is perhaps the most confidence-inducing Node.js 
> client thanks to its documentation, 
> [https://kafka.js.org.|https://kafka.js.org./]  The list at 
> [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an 
> officially-supported Go language client; Go's community is dwarfed by that of 
> Node.js.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-18 Thread GitBox


abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-675675970


   Some compilation errors:
   ```
   11:41:23 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk15-scala2.13/clients/src/main/java/org/apache/kafka/clients/Metadata.java:474:
 error: cannot assign a value to final variable refreshBackoffMs
   11:41:23 this.refreshBackoffMs = 
this.refreshBackoff.backoff(this.attempts);
   11:41:23 ^
   11:41:23 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk15-scala2.13/clients/src/main/java/org/apache/kafka/clients/Metadata.java:481:
 error: cannot assign a value to final variable refreshBackoffMs
   11:41:23 this.refreshBackoffMs = this.refreshBackoff.baseBackoff();
   11:41:23   
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-18 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r472436400



##
File path: checkstyle/suppressions.xml
##
@@ -167,6 +167,9 @@
 
 
+

Review comment:
   We don't!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


guozhangwang commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472429141



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
##
@@ -33,7 +35,7 @@
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;
 
-public class RocksDBMetrics {
+public class  RocksDBMetrics {

Review comment:
   nit: extra space.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
 }
 }
 
-public final void removeAllStoreLevelSensors(final String threadId,
- final String taskId,
- final String storeName) {
+public  void addStoreLevelMutableMetric(final String threadId,
+   final String taskId,
+   final String metricsScope,
+   final String storeName,
+   final String name,
+   final String description,
+   final RecordingLevel 
recordingLevel,
+   final Gauge valueProvider) {
+final MetricName metricName = metrics.metricName(
+name,
+STATE_STORE_LEVEL_GROUP,
+description,
+storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+);
+if (metrics.metric(metricName) == null) {
+final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+final String key = storeSensorPrefix(threadId, taskId, storeName);
+synchronized (storeLevelMetrics) {
+metrics.addMetric(metricName, metricConfig, valueProvider);
+storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+}
+}
+}
+
+public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+   final String taskId,
+   final String 
storeName) {
+removeAllStoreLevelMetrics(threadId, taskId, storeName);
+removeAllStoreLevelSensors(threadId, taskId, storeName);
+}

Review comment:
   This may be a bit paranoid, but when adding them, the order seem to be 
`initSensors` first and `initGauges`, while removing we call 
`removeAllStoreLevelMetrics` first and then the other. I know that today there 
should be not concurrent threads trying to init / removeAll concurrently, but 
just to be safe maybe we can make the call ordering to be sensors first and 
then gauges?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10327) Make flush after some count of putted records in SinkTask

2020-08-18 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10327:
---
Labels: kip-required needs-kip  (was: )

> Make flush after some count of putted records in SinkTask
> -
>
> Key: KAFKA-10327
> URL: https://issues.apache.org/jira/browse/KAFKA-10327
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Pavel Kuznetsov
>Priority: Major
>  Labels: kip-required, needs-kip
>
> In current version of kafka connect all records accumulated with SinkTask.put 
> method are flushed to target system on a time-based manner. So data is 
> flushed and offsets are committed every  offset.flush.timeout.ms (default is 
> 6) ms.
> But you can't control the number of messages you receive from Kafka between 
> two flushes. It may cause out of memory errors, because in-memory buffer may 
> grow a lot. 
> I suggest to add out of box support of count-based flush to kafka connect. It 
> requires new configuration parameter (offset.flush.count, for example). 
> Number of records sent to SinkTask.put should be counted, and if these amount 
> is greater than offset.flush.count's value, SinkTask.flush is called and 
> offsets are committed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


guozhangwang commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675651901


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-18 Thread GitBox


abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-675647225


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10415) Provide an officially supported Node.js client

2020-08-18 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179995#comment-17179995
 ] 

Ismael Juma commented on KAFKA-10415:
-

Thanks for the JIRA. One clarification, Apache Kafka doesn't have an officially 
supported Go language client. The link you referenced is from Confluent, not 
Apache Kafka

 

> Provide an officially supported Node.js client
> --
>
> Key: KAFKA-10415
> URL: https://issues.apache.org/jira/browse/KAFKA-10415
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Matthew T. Adams
>Priority: Major
>
> Please provide an official Node.js client for Kafka at feature parity with 
> all of the other officially supported & provided Kafka clients.
> It is extremely confusing when it comes to trying to use Kafka in the Node.js 
> ecosystem.  There are many clients, some look legitimate 
> ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out 
> of date (many listed at 
> [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), 
> and others have confusing relationships among them 
> ([https://github.com/nodefluent/node-sinek] & 
> [https://github.com/nodefluent/kafka-streams]).  Most of them are publicly 
> asking for help.  This leaves teams having to waste time trying to figure out 
> which client has the Kafka features they need (mostly talking about streaming 
> here), and which client has high quality and will be around in the future.  
> If the client came directly from this project, those decisions would be made 
> and we could get on about our work.
> JavaScript is on the of the most popular languages on the planet, and the 
> Node.js user base is huge – big enough that a Node.js client provided 
> directly by the Kafka team is justified.  The list at 
> [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] 
> doesn't even mention what is perhaps the most confidence-inducing Node.js 
> client thanks to its documentation, 
> [https://kafka.js.org.|https://kafka.js.org./]  The list at 
> [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an 
> officially-supported Go language client; Go's community is dwarfed by that of 
> Node.js.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10415) Provide an officially supported Node.js client

2020-08-18 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179995#comment-17179995
 ] 

Ismael Juma edited comment on KAFKA-10415 at 8/18/20, 6:14 PM:
---

Thanks for the JIRA. One clarification, Apache Kafka doesn't have an officially 
supported Go language client. The link you referenced is from Confluent, not 
Apache Kafka.

Apache Kafka ships a Java client only.

 


was (Author: ijuma):
Thanks for the JIRA. One clarification, Apache Kafka doesn't have an officially 
supported Go language client. The link you referenced is from Confluent, not 
Apache Kafka

 

> Provide an officially supported Node.js client
> --
>
> Key: KAFKA-10415
> URL: https://issues.apache.org/jira/browse/KAFKA-10415
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Matthew T. Adams
>Priority: Major
>
> Please provide an official Node.js client for Kafka at feature parity with 
> all of the other officially supported & provided Kafka clients.
> It is extremely confusing when it comes to trying to use Kafka in the Node.js 
> ecosystem.  There are many clients, some look legitimate 
> ([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out 
> of date (many listed at 
> [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), 
> and others have confusing relationships among them 
> ([https://github.com/nodefluent/node-sinek] & 
> [https://github.com/nodefluent/kafka-streams]).  Most of them are publicly 
> asking for help.  This leaves teams having to waste time trying to figure out 
> which client has the Kafka features they need (mostly talking about streaming 
> here), and which client has high quality and will be around in the future.  
> If the client came directly from this project, those decisions would be made 
> and we could get on about our work.
> JavaScript is on the of the most popular languages on the planet, and the 
> Node.js user base is huge – big enough that a Node.js client provided 
> directly by the Kafka team is justified.  The list at 
> [https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] 
> doesn't even mention what is perhaps the most confidence-inducing Node.js 
> client thanks to its documentation, 
> [https://kafka.js.org.|https://kafka.js.org./]  The list at 
> [https://docs.confluent.io/current/clients/index.html#ak-clients] includes an 
> officially-supported Go language client; Go's community is dwarfed by that of 
> Node.js.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

2020-08-18 Thread GitBox


guozhangwang commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r472368768



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {

Review comment:
   I agree with @cadonna here, that all exceptions from inside Streams 
should be inheriting from `StreamsException`, and that may include 1) 
environmental issues like timeout --- note that after @mjsax KIP we would not 
throw TimeoutException (which is a KafkaException not StreamsException) but a 
new exception inherited from StreamsException, and IOException which are also 
wrapped as StateStoreException inherited from StreamsException, and 2) user bug 
in `process` etc which get caught by streams code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

2020-08-18 Thread GitBox


vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r472368613



##
File path: 
streams/src/test/java/org/apache/kafka/test/MockApiProcessorSupplier.java
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.PunctuationType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class MockApiProcessorSupplier implements 
ProcessorSupplier {

Review comment:
   This one isn't so straightforward. Although the supplied processors can 
be adapted, `theCapturedProcessor()` and 
`capturedProcessors(expectedNumberOfProcessors)` return `MockProcessor` 
specifically, so we'd need a whole new adapter to convert a MockApiProcessor 
into a MockProcessor. I'd rather leave it alone for now. These delegating 
classes will go away in a couple more PRs anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 closed pull request #7492: Kafka 7499 production exception handler serialization exceptions

2020-08-18 Thread GitBox


wcarlson5 closed pull request #7492:
URL: https://github.com/apache/kafka/pull/7492


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

2020-08-18 Thread GitBox


vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r472348482



##
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class MockApiProcessor implements Processor {

Review comment:
   Ah, yeah, this is a good idea. I'll have to migrate all the field 
references to method references so they can be delegated, but I wanted to do 
that anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-10395.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
> Fix For: 2.7.0
>
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9148: KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore()

2020-08-18 Thread GitBox


vvcephei commented on a change in pull request #9148:
URL: https://github.com/apache/kafka/pull/9148#discussion_r472327085



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -667,7 +674,7 @@ public void validateCopartition() {
 private void validateGlobalStoreArguments(final String sourceName,
   final String topic,
   final String processorName,
-  final ProcessorSupplier 
stateUpdateSupplier,
+  final ProcessorSupplier stateUpdateSupplier,

Review comment:
   Sure; all we do it verify it's not null, but it doesn't hurt.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8606) Provide a method to fetch committed offsets for a collection of TopicPartition

2020-08-18 Thread Mark Roberts (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179703#comment-17179703
 ] 

Mark Roberts commented on KAFKA-8606:
-

I'm reasonably sure that this ticket can be closed as of 2.4

> Provide a method to fetch committed offsets for a collection of TopicPartition
> --
>
> Key: KAFKA-8606
> URL: https://issues.apache.org/jira/browse/KAFKA-8606
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 2.3.0, 2.2.1
>Reporter: ov7a
>Priority: Major
>
> Currently KafkaConsumer has methods for fetching begging offsets, end offsets 
> and offsets for times, all of them accepting a collection of TopicPartition.
> There is a method to fetch committed offset for single TopicPartition, but 
> there is no public API to fetch commited offsets for a collection of 
> TopicPartition. So, If one wants to fetch all committed offsets for topic, a 
> request per partition is created.
> Note that ConsumerCoordinator.fetchCommittedOffsets which called internally 
> in "committed" method does accept a collection of TopicPartition. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9142: MINOR: Fix delete_topic for system tests

2020-08-18 Thread GitBox


rondagostino commented on a change in pull request #9142:
URL: https://github.com/apache/kafka/pull/9142#discussion_r472279480



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -503,7 +503,7 @@ def create_topic(self, topic_cfg, node=None, 
use_zk_to_create_topic=True):
 self.logger.info("Running topic creation command...\n%s" % cmd)
 node.account.ssh(cmd)
 
-def delete_topic(self, topic, node=None):
+def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False):

Review comment:
   @rajinisivaram Yes, merging to just trunk seems fine to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472276615



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
 }
 }
 
-public final void removeAllStoreLevelSensors(final String threadId,
- final String taskId,
- final String storeName) {
+public  void addStoreLevelMutableMetric(final String threadId,
+   final String taskId,
+   final String metricsScope,
+   final String storeName,
+   final String name,
+   final String description,
+   final RecordingLevel 
recordingLevel,
+   final Gauge valueProvider) {
+final MetricName metricName = metrics.metricName(
+name,
+STATE_STORE_LEVEL_GROUP,
+description,
+storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+);
+if (metrics.metric(metricName) == null) {
+final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+final String key = storeSensorPrefix(threadId, taskId, storeName);
+synchronized (storeLevelMetrics) {
+metrics.addMetric(metricName, metricConfig, valueProvider);
+storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+}
+}
+}
+
+public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+   final String taskId,
+   final String 
storeName) {
+removeAllStoreLevelMetrics(threadId, taskId, storeName);
+removeAllStoreLevelSensors(threadId, taskId, storeName);
+}

Review comment:
   Yes, we need to synchronize. At least, we have to ensure that lines 411 
and 438 are thread-safe. Then, if we do not want to have duplicates in 
`storeLevelSensors` we should ensure to have a lock between line 409 to 411. 
Between line 434 and 439, we need to ensure that the removal of all store level 
metrics completed otherwise it could happen that we find a store level metric 
that would prevent the addition of a metric but then the earlier found metric 
would be removed during the remainder of the removal process. Similar is true 
for the store level sensors.
   
   It is true that we always remove all of both collections together, but we do 
not add metric names and sensor names to both collections together.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472276615



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
 }
 }
 
-public final void removeAllStoreLevelSensors(final String threadId,
- final String taskId,
- final String storeName) {
+public  void addStoreLevelMutableMetric(final String threadId,
+   final String taskId,
+   final String metricsScope,
+   final String storeName,
+   final String name,
+   final String description,
+   final RecordingLevel 
recordingLevel,
+   final Gauge valueProvider) {
+final MetricName metricName = metrics.metricName(
+name,
+STATE_STORE_LEVEL_GROUP,
+description,
+storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+);
+if (metrics.metric(metricName) == null) {
+final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+final String key = storeSensorPrefix(threadId, taskId, storeName);
+synchronized (storeLevelMetrics) {
+metrics.addMetric(metricName, metricConfig, valueProvider);
+storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+}
+}
+}
+
+public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+   final String taskId,
+   final String 
storeName) {
+removeAllStoreLevelMetrics(threadId, taskId, storeName);
+removeAllStoreLevelSensors(threadId, taskId, storeName);
+}

Review comment:
   Yes, we need to synchronize. At least, we have to ensure that lines 411 
and 438 are thread-safe. Then, if we do not want to have duplicates in 
`storeLevelSensors` we should ensure to have a lock between line 409 to 411. 
Between line 434 and 439, we need to ensure that the removal of all store level 
metrics completed otherwise it could happen that we find a store level metric 
that would prevent the addition of a metric but then earlier found metric would 
be removed during the remainder of the removal process. Similar is true for the 
store level sensors.
   
   It is true that we always remove all of both collections together, but we do 
not add metric names and sensor names to both collections together.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-18 Thread Wardha Perinkada Kattu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wardha Perinkada Kattu updated KAFKA-10417:
---
Description: 
Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` throws 
`ClassCastException`

Works fine without the `suppress()`

Code block tested -
{code:java}
val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), 
serdesConfig.notificationSerde()))

val streams2 = confirmationStreams
.groupByKey(Grouped.with(Serdes.String(), 
serdesConfig.confirmationsSerde()))

val cogrouped = 
stream1.cogroup(notificationAggregator).cogroup(streams2, 
confirmationsAggregator)

.windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
.aggregate({ null }, Materialized.`as`>("time-windowed-aggregated-stream-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
 .suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()

{code}
Exception thrown is:
{code:java}
Caused by: java.lang.ClassCastException: class 
org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
(org.apache.kafka.streams.kstream.internals.PassThrough and 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
unnamed module of loader 'app')
{code}
[https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]

  was:
Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` throws 
`ClassCastException`

Works fine without the `suppress()`

 
{code:java}

val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), 
serdesConfig.notificationSerde()))

val streams2 = confirmationStreams
.groupByKey(Grouped.with(Serdes.String(), 
serdesConfig.confirmationsSerde()))

val cogrouped = 
stream1.cogroup(notificationAggregator).cogroup(streams2, 
confirmationsAggregator)

.windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
.aggregate({ null }, Materialized.`as`>("time-windowed-aggregated-stream-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
 .suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()

{code}
Exception thrown is:
{code:java}
Caused by: java.lang.ClassCastException: class 
org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
(org.apache.kafka.streams.kstream.internals.PassThrough and 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
unnamed module of loader 'app')
{code}
 

[https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]

 


> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Blocker
>  Labels: kafka-streams
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown 

[jira] [Created] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-18 Thread Wardha Perinkada Kattu (Jira)
Wardha Perinkada Kattu created KAFKA-10417:
--

 Summary: suppress() with cogroup() throws ClassCastException
 Key: KAFKA-10417
 URL: https://issues.apache.org/jira/browse/KAFKA-10417
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Wardha Perinkada Kattu


Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` throws 
`ClassCastException`

Works fine without the `suppress()`

 
{code:java}

val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
.groupByKey(Grouped.with(Serdes.String(), 
serdesConfig.notificationSerde()))

val streams2 = confirmationStreams
.groupByKey(Grouped.with(Serdes.String(), 
serdesConfig.confirmationsSerde()))

val cogrouped = 
stream1.cogroup(notificationAggregator).cogroup(streams2, 
confirmationsAggregator)

.windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
.aggregate({ null }, Materialized.`as`>("time-windowed-aggregated-stream-store")
.withValueSerde(serdesConfig.notificationMetricSerde()))
 .suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()

{code}
Exception thrown is:
{code:java}
Caused by: java.lang.ClassCastException: class 
org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to class 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
(org.apache.kafka.streams.kstream.internals.PassThrough and 
org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
unnamed module of loader 'app')
{code}
 

[https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-18 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179669#comment-17179669
 ] 

Ning Zhang commented on KAFKA-10370:


Hey [~ryannedolan] I updated the PR with the following code in 
`onPartitionsAssigned`. From my initial testing, it works well with 
`MirrorSinkTask`. Since the partition assignment is now driven by Consumer (as 
we use `consumer.subscribe()`), the `offsets` that passed into 
`context.offsets(offsets)` is all  associated with consumer 
group, rather than letting `MirrorSinkTask` to do the consuming task assignment.

Appreciate for your above thoughts and definitely expecting more feedback!

https://github.com/apache/kafka/pull/9145/files#diff-9d27e74bcdc892150367aed9a4cf499eR617-R698

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> 

[GitHub] [kafka] vvcephei edited a comment on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


vvcephei edited a comment on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675522188


   Looks like Jenkins shut down during the run last time or something.
   
   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


vvcephei commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675522188


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


vvcephei commented on pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#issuecomment-675521788


   Thanks for the update, @cadonna . Just one reply above.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472250612



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
 }
 }
 
-public final void removeAllStoreLevelSensors(final String threadId,
- final String taskId,
- final String storeName) {
+public  void addStoreLevelMutableMetric(final String threadId,
+   final String taskId,
+   final String metricsScope,
+   final String storeName,
+   final String name,
+   final String description,
+   final RecordingLevel 
recordingLevel,
+   final Gauge valueProvider) {
+final MetricName metricName = metrics.metricName(
+name,
+STATE_STORE_LEVEL_GROUP,
+description,
+storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+);
+if (metrics.metric(metricName) == null) {
+final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+final String key = storeSensorPrefix(threadId, taskId, storeName);
+synchronized (storeLevelMetrics) {
+metrics.addMetric(metricName, metricConfig, valueProvider);
+storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+}
+}
+}
+
+public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+   final String taskId,
+   final String 
storeName) {
+removeAllStoreLevelMetrics(threadId, taskId, storeName);
+removeAllStoreLevelSensors(threadId, taskId, storeName);
+}

Review comment:
   It just seems oddly granular to synchronize them individually, since we 
always remove all of both collections together. If it doesn't matter, then do 
we need to synchronize at all?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-18 Thread GitBox


vvcephei merged pull request #9174:
URL: https://github.com/apache/kafka/pull/9174


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-18 Thread GitBox


vvcephei commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r472242890



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
 private Queue> getRecordsQueue(final String 
topicName) {
 final Queue> outputRecords = 
outputRecordsByTopic.get(topicName);
-if (outputRecords == null) {
-if (!processorTopology.sinkTopics().contains(topicName)) {
-throw new IllegalArgumentException("Unknown topic: " + 
topicName);
-}
+if (outputRecords == null && 
!processorTopology.sinkTopics().contains(topicName)) {

Review comment:
   Ok, let's just keep it in our back pocket for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10416) Provide an officially supported Deno client

2020-08-18 Thread Matthew T. Adams (Jira)
Matthew T. Adams created KAFKA-10416:


 Summary: Provide an officially supported Deno client
 Key: KAFKA-10416
 URL: https://issues.apache.org/jira/browse/KAFKA-10416
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Reporter: Matthew T. Adams


This is a similar request to https://issues.apache.org/jira/browse/KAFKA-10415 
for a JavaScript client for Kafka, only packaged for 
[https://deno.land|https://deno.land/] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10415) Provide an officially supported Node.js client

2020-08-18 Thread Matthew T. Adams (Jira)
Matthew T. Adams created KAFKA-10415:


 Summary: Provide an officially supported Node.js client
 Key: KAFKA-10415
 URL: https://issues.apache.org/jira/browse/KAFKA-10415
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Reporter: Matthew T. Adams


Please provide an official Node.js client for Kafka at feature parity with all 
of the other officially supported & provided Kafka clients.

It is extremely confusing when it comes to trying to use Kafka in the Node.js 
ecosystem.  There are many clients, some look legitimate 
([http://kafka.js.org),|http://kafka.js.org%29%2C/] but some are woefully out 
of date (many listed at 
[https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js]), 
and others have confusing relationships among them 
([https://github.com/nodefluent/node-sinek] & 
[https://github.com/nodefluent/kafka-streams]).  Most of them are publicly 
asking for help.  This leaves teams having to waste time trying to figure out 
which client has the Kafka features they need (mostly talking about streaming 
here), and which client has high quality and will be around in the future.  If 
the client came directly from this project, those decisions would be made and 
we could get on about our work.

JavaScript is on the of the most popular languages on the planet, and the 
Node.js user base is huge – big enough that a Node.js client provided directly 
by the Kafka team is justified.  The list at 
[https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Node.js] 
doesn't even mention what is perhaps the most confidence-inducing Node.js 
client thanks to its documentation, 
[https://kafka.js.org.|https://kafka.js.org./]  The list at 
[https://docs.confluent.io/current/clients/index.html#ak-clients] includes an 
officially-supported Go language client; Go's community is dwarfed by that of 
Node.js.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-18 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban reassigned KAFKA-10414:


Assignee: Daniel Urban

> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs. Can be fixed by 
> upgrading to at least version 2.0.0.AM25



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-18 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-10414:


 Summary: Upgrade api-util dependency - CVE-2018-1337
 Key: KAFKA-10414
 URL: https://issues.apache.org/jira/browse/KAFKA-10414
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban


There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs. Can be fixed by 
upgrading to at least version 2.0.0.AM25



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

2020-08-18 Thread GitBox


cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r472228811



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {

Review comment:
   That is a good point. I thought all exceptions thrown from inside 
Streams (except for the `IllegalStateException`) should be `StreamsException`s. 
The `RecordDeserializer` throws a `StreamsException` when the deserialization 
exception handler -- which is user code -- throws any exception. Maybe 
@guozhangwang can help here. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

2020-08-18 Thread GitBox


cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r472217989



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {
+
+private final static long serialVersionUID = 1L;

Review comment:
   That is a good question. I have to admit that I blindly copied it from 
another exception class. The field has to do with exceptions implementing the 
`Serializable` interface. This field tells the JVM whether the serialized 
object can be deserialized into an object of the class that it is available in 
the JVM. See 
https://stackoverflow.com/questions/7187302/what-is-serialversionuid-in-java-normally-in-exception-class






This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472200360



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
 }
 }
 
-public final void removeAllStoreLevelSensors(final String threadId,
- final String taskId,
- final String storeName) {
+public  void addStoreLevelMutableMetric(final String threadId,
+   final String taskId,
+   final String metricsScope,
+   final String storeName,
+   final String name,
+   final String description,
+   final RecordingLevel 
recordingLevel,
+   final Gauge valueProvider) {
+final MetricName metricName = metrics.metricName(
+name,
+STATE_STORE_LEVEL_GROUP,
+description,
+storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+);
+if (metrics.metric(metricName) == null) {
+final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+final String key = storeSensorPrefix(threadId, taskId, storeName);
+synchronized (storeLevelMetrics) {
+metrics.addMetric(metricName, metricConfig, valueProvider);
+storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+}
+}
+}
+
+public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+   final String taskId,
+   final String 
storeName) {
+removeAllStoreLevelMetrics(threadId, taskId, storeName);
+removeAllStoreLevelSensors(threadId, taskId, storeName);
+}

Review comment:
   Do you have performance concerns due to the two monitors? Or what is the 
main reason for using a single monitor here? By using a single monitor here and 
in `addStoreLevelMutableMetric()` and `storeLevelSensor()`, we do not ensure 
that no metrics are added to the metrics map during removal of all metrics 
because each time  `Sensor#add()` is called a metric is added without 
synchronizing on the monitor of `storeLevelSensors`. Single operations on the 
metrics map are synchronized (through `ConcurrentMap`), but not multiple 
operations.
   


##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##
@@ -609,6 +611,37 @@ public void 
shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
 assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
 }
 
+@Test
+public void 
shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
+final TaskId taskId = new TaskId(0, 0);
+
+final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
+final StreamsMetricsImpl streamsMetrics =
+new StreamsMetricsImpl(metrics, "test-application", 
StreamsConfig.METRICS_LATEST, time);
+
+context = EasyMock.niceMock(InternalMockProcessorContext.class);
+EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+EasyMock.expect(context.taskId()).andStubReturn(taskId);
+EasyMock.expect(context.appConfigs())
+.andStubReturn(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
+EasyMock.expect(context.stateDir()).andStubReturn(dir);
+EasyMock.replay(context);
+
+rocksDBStore.init(context, rocksDBStore);
+final byte[] key = "hello".getBytes();
+final byte[] value = "world".getBytes();
+rocksDBStore.put(Bytes.wrap(key), value);
+
+final Metric numberOfEntriesActiveMemTable = metrics.metric(new 
MetricName(
+"num-entries-active-mem-table",
+StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+"description is not verified",
+streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), 
taskId.toString(), METRICS_SCOPE, DB_NAME)
+));
+assertThat(numberOfEntriesActiveMemTable, notNullValue());
+assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), 
greaterThan(BigInteger.valueOf(0)));

Review comment:
   Yes, but in this test I merely test whether the metric is updated. The 
correctness of the computation is verified in 
`RocksDBMetricsRecorderGaugesTest`. I will improve this test to 

[GitHub] [kafka] rhauch commented on pull request #9176: Allow replace all for RegexRouter

2020-08-18 Thread GitBox


rhauch commented on pull request #9176:
URL: https://github.com/apache/kafka/pull/9176#issuecomment-675475664


   @kkonstantine is correct that this changes the public API and therefore 
requires a KIP. 
   
   Please also create a Jira issue, and change the title of this PR so that it 
starts with the issue (e.g., `KAFKA-12345: ` but with your issue number).  This 
will auto-link this PR in the Jira issue. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8135) Kafka Producer deadlocked on flush call with intermittent broker unavailability

2020-08-18 Thread Ranadeep Deb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179616#comment-17179616
 ] 

Ranadeep Deb commented on KAFKA-8135:
-

I have been experiencing a similar issue in 2.12-2.3.0

I have a multi threaded application with each thread sending an individual 
message to the broker. There are instances where I have observed that the 
Producer threads get stuck on the Producer.send().get() call. I was not sure 
what was causing this issue but after landing on this thread I am suspecting 
that intermittent network outage might be the reason. 

I am curious about how to solve this.

 

Thanks

> Kafka Producer deadlocked on flush call with intermittent broker 
> unavailability
> ---
>
> Key: KAFKA-8135
> URL: https://issues.apache.org/jira/browse/KAFKA-8135
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Guozhang Wang
>Assignee: Rajini Sivaram
>Priority: Major
>
> In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
> and the value is default to 2 minutes. We've observed that when it was set to 
> MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
> {{broker.flush}} call would be blocked during the time when its destination 
> brokers are undergoing some unavailability:
> {code}
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
> - parking to wait for  <0x0006aeb21a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
> Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
>  Source)
> at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
> Source)
> at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> And even after the broker went back to normal, producers would still be 
> blocked. One suspicion is that when broker's not able to handle the request 
> in time, the responses are dropped somehow inside the Sender, and hence 
> whoever waiting on this response would be blocked forever.
> We've observed such scenarios when 1) broker's transiently failed for a 
> while, 2) network partitioned transiently, and 3) broker's bad config like 
> ACL caused it to not be able to handle requests for a while.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2020-08-18 Thread yazgoo (Jira)
yazgoo created KAFKA-10413:
--

 Summary: rebalancing leads to unevenly balanced connectors
 Key: KAFKA-10413
 URL: https://issues.apache.org/jira/browse/KAFKA-10413
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.1
Reporter: yazgoo


Hi,

With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if 
a connect instance disappear, or a new one appear, we're seeing unbalanced 
consumption, much like mentionned in this post:

[https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]


This usually leads to one kafka connect instance taking most of the load and 
consumption not being able to keep on.
Currently, we're "fixing" this by deleting the connector and re-creating it, 
but this is far from ideal.

Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table

2020-08-18 Thread GitBox


cadonna commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472100951



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##
@@ -56,6 +62,9 @@ public void maybeCloseStatistics() {
 }
 }
 
+private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
+private static final ByteBuffer CONVERSION_BUFFER = 
ByteBuffer.allocate(Long.BYTES);

Review comment:
   Good point! I missed that the gauge can be called by multiple metrics 
reporters concurrently.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-18 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17179535#comment-17179535
 ] 

Bruno Cadonna commented on KAFKA-10357:
---

bq. Can we leverage the committed offsets somehow? It seems like if the 
repartition topics don't exist but the group has committed offsets for them, 
then they must have been deleted

If I understand it correctly, when a topic is deleted also its committed 
offsets are deleted. That means, the situation you describe is not possible. 
Please let me know, if I missed anything.

bq.  It seems preferable to me to have streams be able to detect when its 
internal state is invalid

As far as I understand, that is exactly what we try to do. We want to detect 
the deletion of a repartition topic and notify the user about it through an 
exception. How the users react to the exception is their business. Admittedly, 
we need to provide some additional functionality to better react on such 
situations.

I am not against a manual initialization step, but I have two concerns:

1) worse out-of-the-box experience because manual steps are required before you 
can play around with Streams
2) exposure of internals like the repartition topics

To solve 1) we could introduce a config that tells Streams to assume that the 
internal topics (or just some of them) are pre-created and therefore not to 
setup them. To avoid the exposure of internal topics we could abstract the 
manual initialization to hide internals. However, what would then happen when a 
repartition topic is deleted? What should Streams do when it can assume that 
internal topics are pre-created and it does not find a repartition topic? 
Either it silently shuts down or it throws an exception on which users can 
react upon. I am in favor of the second.

Instead to the manual initialization step, I would prefer a way to persist a 
flag that indicates that an automatic initialization was performed. If 
something unexpected happens the application could then be reset to a valid 
state with the application reset tool and the application reset tool would also 
reset the flag. But currently, I do not know where we could persist such a 
flag. Maybe somewhere on the brokers and let the flag be managed by the group 
coordinator? WDYT?   

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10412) Deprecate all setters of Headers

2020-08-18 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10412:
--

 Summary: Deprecate all setters of Headers
 Key: KAFKA-10412
 URL: https://issues.apache.org/jira/browse/KAFKA-10412
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


_Headers is a part of public APIs but the usage of Headers is a bit chaos to 
users._
 # it has a lot of abstract setters so users have to implement all of them. 
However, we never call user-defined setters in production.
 # it is not thread-safe so we have to call "setReadOnly" to make data 
consistency.
 # "setReadOnly" is not a part of public API so users have no idea about the 
"thread-safe" of Headers

We can't improve Headers right now by reason of deprecation cycles. This KIP 
plans to deprecate and offer default implementation 
(_java_.lang.UnsupportedOperationException) to all setters of _Headers so we 
can cleanup all setters to make it be readonly in next major and users are able 
to remove all useless implementation from their_ _Headers in this patch._



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov opened a new pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-08-18 Thread GitBox


nizhikov opened a new pull request #9196:
URL: https://github.com/apache/kafka/pull/9196


   For now, Kafka system tests use python2 which is outdated and not supported.
   This PR upgrades python to the third version.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-08-18 Thread GitBox


jeqo commented on pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#issuecomment-675384593


   @ableegoldman, this PR is ready for review  
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #6669: KAFKA-8238: Adding Number of messages/bytes read

2020-08-18 Thread GitBox


stanislavkozlovski commented on pull request #6669:
URL: https://github.com/apache/kafka/pull/6669#issuecomment-675358064


   Yeah, that'd be awesome!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Sasilekha commented on pull request #9195: KAFKA-10092 Remove unnecessary enum modifier in NioEchoServer

2020-08-18 Thread GitBox


Sasilekha commented on pull request #9195:
URL: https://github.com/apache/kafka/pull/9195#issuecomment-675358144


   Hi @ notifygd 
   Can you please review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Sasilekha opened a new pull request #9195: KAFKA-10092 Remove unnecessary enum modifier in NioEchoServer

2020-08-18 Thread GitBox


Sasilekha opened a new pull request #9195:
URL: https://github.com/apache/kafka/pull/9195


   In NioEchoServer the enum has its constructor declared as private, which is 
redundant. We can remove this.
   
   public class NioEchoServer extends Thread {
   public enum MetricType {
   TOTAL, RATE, AVG, MAX;
   private final String metricNameSuffix;
   
   private MetricType() {
   metricNameSuffix = "-" + name().toLowerCase(Locale.ROOT);
   }}} 
   
   Removed the MetricType constructor for the MetricType Enum



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

2020-08-18 Thread GitBox


chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-67533


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >