[GitHub] [kafka] kowshik commented on pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#issuecomment-845641737


   @ccding and @showuon: Thanks for the review! I've addressed your comment(s) 
and responded to your questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636623193



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -172,7 +173,7 @@ object LogLoader extends Logging {
   private def removeTempFilesAndCollectSwapFiles(params: LoadLogParams): 
Set[File] = {
 
 def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
-  info(s"${params.logIdentifier} Deleting index files with suffix $suffix 
for baseFile $baseFile")
+  info(s"${params.logIdentifier}Deleting index files with suffix $suffix 
for baseFile $baseFile")

Review comment:
   Yes, because the existing identifier in the `Log` class is suffixed with 
a whitespace, see: 
https://github.com/apache/kafka/blob/5d4f9f917c8dc356a2d922980ba8101e0f2e7093/core/src/main/scala/kafka/log/Log.scala#L280
   I'm adding a whitespace here to be consistent with what we already have in 
the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636623604



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -67,7 +67,7 @@ case class LoadLogParams(dir: File,
  maxProducerIdExpirationMs: Int,
  leaderEpochCache: Option[LeaderEpochFileCache],
  producerStateManager: ProducerStateManager) {
-  val logIdentifier: String = s"[LogLoader partition=$topicPartition, 
dir=${dir.getParent}]"
+  val logIdentifier: String = s"[LogLoader partition=$topicPartition, 
dir=${dir.getParent}] "

Review comment:
   This is required because the existing identifier in the `Log` class is 
suffixed with a whitespace, see: 
https://github.com/apache/kafka/blob/5d4f9f917c8dc356a2d922980ba8101e0f2e7093/core/src/main/scala/kafka/log/Log.scala#L280
   I'm adding a whitespace here to be consistent with what we already have in 
the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636623604



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -67,7 +67,7 @@ case class LoadLogParams(dir: File,
  maxProducerIdExpirationMs: Int,
  leaderEpochCache: Option[LeaderEpochFileCache],
  producerStateManager: ProducerStateManager) {
-  val logIdentifier: String = s"[LogLoader partition=$topicPartition, 
dir=${dir.getParent}]"
+  val logIdentifier: String = s"[LogLoader partition=$topicPartition, 
dir=${dir.getParent}] "

Review comment:
   This is required because the existing identifier in the `Log` class is 
suffixed with a whitespace, see: 
https://github.com/apache/kafka/blob/5d4f9f917c8dc356a2d922980ba8101e0f2e7093/core/src/main/scala/kafka/log/Log.scala#L280.
   I'm adding a whitespace here to be consistent with what we already have in 
the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636623193



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -172,7 +173,7 @@ object LogLoader extends Logging {
   private def removeTempFilesAndCollectSwapFiles(params: LoadLogParams): 
Set[File] = {
 
 def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
-  info(s"${params.logIdentifier} Deleting index files with suffix $suffix 
for baseFile $baseFile")
+  info(s"${params.logIdentifier}Deleting index files with suffix $suffix 
for baseFile $baseFile")

Review comment:
   Yes, because the existing identifier in the `Log` class is suffixed with 
a whitespace, see: 
https://github.com/apache/kafka/blob/5d4f9f917c8dc356a2d922980ba8101e0f2e7093/core/src/main/scala/kafka/log/Log.scala#L280.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636622730



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2225,13 +2225,15 @@ object Log extends Logging {
* @param dir The directory in which the log will reside
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously 
handle log dir failure
-   * @param recordVersion The record version
+   * @param recordVersion The record

Review comment:
   No. It was changed accidentally. I have fixed it now in 
35e389b5c05fddf5682f344ddf58222603133a19.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-20 Thread GitBox


ableegoldman commented on pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#issuecomment-845638871


   Rebased after the TaskId changes in KIP-470, and responded to all comments. 
Not much has changed since the last review, just cleaning up here and there. 
It's pretty much done except for StateDirectoryTest, which I can always do in a 
quick followup PR to unblock other downstream work with this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r636620946



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -77,6 +84,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+//TODO KAFKA-12648: add tests for named topology specific stuff

Review comment:
   I've added a few tests for the named topology stuff but I definitely 
want to add more and just haven't yet had time. Since I'm still on-call and 
therefore unlikely to have time until next week, if you both are able to do a 
quick pass and don't have any further feedback on the PR as-is, it may make 
sense to just merge this PR tomorrow (Friday) and do a quick followup PR for 
the tests next week.
   
   That way  I can rebase the next PR  ([Pt. 
2](https://github.com/apache/kafka/pull/10683)) and you all can actually begin 
reviewing that. cc @guozhangwang @wcarlson5 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9296) Correlation id for response () does not match request ()

2021-05-20 Thread Lea (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348627#comment-17348627
 ] 

Lea edited comment on KAFKA-9296 at 5/21/21, 4:03 AM:
--

I had the same problem using kafka producer again.
 Environment: K8S POD

client version: 2.4.1

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread ] ERROR 
o.a.k.c.producer.internals.Sender run 246 - [Producer clientId=] Uncaught error 
in kafka producer I/O thread: java.lang.IllegalStateException: Correlation id 
for response (80779915) does not match request (80779910), request header: 
RequestHeader(apiKey=PRODUCE, apiVersion=5, clientId=, correlationId=80779910) 
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 


was (Author: leaye):
I had the same problem using kafka producer again.
 Environment: K8S POD

client version: 2.4.1

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread ] ERROR 
o.a.k.c.producer.internals.Sender run 246 - [Producer clientId=] Uncaught error 
in kafka producer I/O thread: java.lang.IllegalStateException: Correlation id 
for response (80779915) does not match request (80779910), request header: 
RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r636619553



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+public class NamedTopologyIntegrationTest {
+//TODO KAFKA-12648

Review comment:
   Just wanted to lay out my test plan somewhere, so it doesn't seem like 
I'm merging all this code with no intention to ever test it. Once the final 
pieces are in (should be by Pt. 2) these are the things I think are important 
to touch on with integration tests. Leave a comment if you have any more 
suggestions or feedback  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9296) Correlation id for response () does not match request ()

2021-05-20 Thread Lea (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348627#comment-17348627
 ] 

Lea edited comment on KAFKA-9296 at 5/21/21, 4:02 AM:
--

I had the same problem using kafka producer again.
 Environment: K8S POD

client version: 2.4.1

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread ] ERROR 
o.a.k.c.producer.internals.Sender run 246 - [Producer clientId=] Uncaught error 
in kafka producer I/O thread: java.lang.IllegalStateException: Correlation id 
for response (80779915) does not match request (80779910), request header: 
RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 


was (Author: leaye):
I had the same problem using kafka producer again.
 Environment: K8S POD

client version: 2.4.1

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread | 
314c35d9-10fd-44e4-9f96-1a60805b250d] ERROR o.a.k.c.producer.internals.Sender 
run 246 - [Producer clientId=314c35d9-10fd-44e4-9f96-1a60805b250d] Uncaught 
error in kafka producer I/O thread: java.lang.IllegalStateException: 
Correlation id for response (80779915) does not match request (80779910), 
request header: RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r636617410



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -246,8 +246,8 @@
 public static final String ENABLE_IDEMPOTENCE_CONFIG = 
"enable.idempotence";
 public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
 + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
-+ "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5, "
-+ "" + 
RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + 
" must be 'all'. If these values "
++ "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5 (with message ordering preserved for any 
allowable vlaue), "

Review comment:
   thank! fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10690: MINOR: clarify message ordering with max in-flight requests and idempotent producer

2021-05-20 Thread GitBox


showuon commented on a change in pull request #10690:
URL: https://github.com/apache/kafka/pull/10690#discussion_r636611321



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -246,8 +246,8 @@
 public static final String ENABLE_IDEMPOTENCE_CONFIG = 
"enable.idempotence";
 public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', 
the producer will ensure that exactly one copy of each message is written in 
the stream. If 'false', producer "
 + "retries due to 
broker failures, etc., may write duplicates of the retried message in the 
stream. "
-+ "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5, "
-+ "" + 
RETRIES_CONFIG + " to be greater than 0 and " + ACKS_CONFIG + 
" must be 'all'. If these values "
++ "Note that enabling 
idempotence requires " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " 
to be less than or equal to 5 (with message ordering preserved for any 
allowable vlaue), "

Review comment:
   typo: with message ordering preserved for any allowable **vlaue** --> 
**value**




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10731: KAFKA-12815: Update JavaDocs of ValueTransformerWithKey

2021-05-20 Thread GitBox


showuon commented on a change in pull request #10731:
URL: https://github.com/apache/kafka/pull/10731#discussion_r636606646



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
##
@@ -75,14 +75,20 @@
 void init(final ProcessorContext context);
 
 /**
- * Transform the given [key and ]value to a new value.
+ * Transform the given [key and] value to a new value.
  * Additionally, any {@link StateStore} that is {@link 
KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * attached} to this operator can be accessed and modified arbitrarily (cf.
  * {@link ProcessorContext#getStateStore(String)}).
  * 
- * Note, that using {@link ProcessorContext#forward(Object, Object)} or
+ * Note that using {@link ProcessorContext#forward(Object, Object)} or
  * {@link ProcessorContext#forward(Object, Object, To)} is not allowed 
within {@code transform} and
  * will result in an {@link StreamsException exception}.
+ * 
+ * Note that if a {@code ValueTransformerWithKey} is used in a {@link 
KTable#transformValues(ValueTransformerWithKeySupplier, String...)}
+ * (or any other overload of {@code KTable#transformValues(...)}) 
operation,
+ * that the provided {@link ProcessorContext} from {@link 
#init(ProcessorContext)}

Review comment:
   is this a typo? 
   if a 'xxx' is used in a 'yyy' operation, **that** the provided 'zzz' from 
'aaa' doesn't guarantee
   
   Maybe replace `that` into `then`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10741: [KAFKA-12644] Add Missing Class-Level Javadoc to Exception Classes

2021-05-20 Thread GitBox


showuon commented on a change in pull request #10741:
URL: https://github.com/apache/kafka/pull/10741#discussion_r636602191



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/InvalidConfigurationException.java
##
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * Indicates an error condition where the specified configuration is not valid

Review comment:
   `nit`: this one and the following sentences are missing subject, i.e. 
**This exception** indicates an error Is it intended?

##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
##
@@ -16,6 +16,12 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This exception indicates that a concurrent exception has occurred.
+ *
+ * The producer attempted to update a transaction while

Review comment:
   I think `concurrent exception has occurred` is not clear enough. Could 
we just say: `This exception indicates that the producer attempted to update a 
transaction while another concurrent operation on the same transaction was 
ongoing.` ?

##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/GroupSubscribedToTopicException.java
##
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * Deleting offsets of a topic is forbidden while the consumer group is 
actively subscribed to it
+ *
+ * This exception is thrown in the event of such occurrence

Review comment:
   `nit`: could we make this 2 sentences into 1 like others? ex: `This 
exception is in the event of deleting offsets of a topic` ?

##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/MemberIdRequiredException.java
##
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * The group member needs to have a valid member id before actually entering a 
consumer group
+ *
+ * This exception is generated when this is not the case

Review comment:
   `nit`: as above, could we make these 2 sentences into 1?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


showuon commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636598563



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -67,7 +67,7 @@ case class LoadLogParams(dir: File,
  maxProducerIdExpirationMs: Int,
  leaderEpochCache: Option[LeaderEpochFileCache],
  producerStateManager: ProducerStateManager) {
-  val logIdentifier: String = s"[LogLoader partition=$topicPartition, 
dir=${dir.getParent}]"
+  val logIdentifier: String = s"[LogLoader partition=$topicPartition, 
dir=${dir.getParent}] "

Review comment:
   Why do you add a space here, and remove the space in the following log 
content?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2225,13 +2225,15 @@ object Log extends Logging {
* @param dir The directory in which the log will reside
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously 
handle log dir failure
-   * @param recordVersion The record version
+   * @param recordVersion The record

Review comment:
   Is it intended to change the `The record version` to `The record`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-20 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-845600016


   Thank you, guys! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


ccding commented on a change in pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#discussion_r636574778



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -172,7 +173,7 @@ object LogLoader extends Logging {
   private def removeTempFilesAndCollectSwapFiles(params: LoadLogParams): 
Set[File] = {
 
 def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
-  info(s"${params.logIdentifier} Deleting index files with suffix $suffix 
for baseFile $baseFile")
+  info(s"${params.logIdentifier}Deleting index files with suffix $suffix 
for baseFile $baseFile")

Review comment:
   is the change (remove blank space here) along with 
https://github.com/apache/kafka/pull/10742/files#diff-54b3df71b1e0697a211d23a9018a91aef773fca0b9cbd1abafbdca6c79664930L70-R70
 (add blank space there) needed? [I am not sure the best practice in Kafka 
codebase]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12816) Add tier storage configs.

2021-05-20 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-12816:
---
Description: 
Add all the tier storage related configurations including remote log manager, 
remote storage manager, and remote log metadata manager. 

These configs are described in the KIP-405 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs.1].

  was:Add all the tier storage related configuration including remote log 
manager, remote storage manager, and remote log metadata manager. 


> Add tier storage configs. 
> --
>
> Key: KAFKA-12816
> URL: https://issues.apache.org/jira/browse/KAFKA-12816
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>
> Add all the tier storage related configurations including remote log manager, 
> remote storage manager, and remote log metadata manager. 
> These configs are described in the KIP-405 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Configs.1].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r636561268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##
@@ -125,6 +130,29 @@ public int errorCode() {
 return data.errorCode();
 }
 
+// For version > MIN_NAMED_TOPOLOGY_VERSION
+private void 
setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map taskOffsetSums) {
+final Map> 
topicGroupIdToPartitionOffsetSum = new HashMap<>();
+for (final Map.Entry taskEntry : 
taskOffsetSums.entrySet()) {
+final TaskId task = taskEntry.getKey();
+
topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.topicGroupId, t -> new 
ArrayList<>()).add(
+new SubscriptionInfoData.PartitionToOffsetSum()
+.setPartition(task.partition)
+.setOffsetSum(taskEntry.getValue()));
+}
+
+data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> {
+final SubscriptionInfoData.TaskOffsetSum taskOffsetSum = new 
SubscriptionInfoData.TaskOffsetSum();
+final TaskId task = t.getKey();
+taskOffsetSum.setTopicGroupId(task.topicGroupId);
+taskOffsetSum.setPartition(task.partition);

Review comment:
   Ah, yes. I tried to explain this with a comment on the 
`SubscriptionInfoData.json` schema but I'll call it out again in the 
`SubscriptionInfo.java` class. Previously we encoded the offset sums as a 
nested "map" of >, where the "map" is 
really an array and the array struct does not itself allow for struct types. 
It's just a gap in the API that no one has cared or had time to close, not a 
fundamental principle. Anyways this meant we had a TopicGroupId and a 
PartitionToOffsetSum struct, where in turn the PartitionToOffsetSum was 
composed of the partition and offset sum base types.
   
   I guess this was reasonable enough when there were only 3 base fields, but 
if we wanted to maintain this nested array structure it would mean adding more 
and more nested structs each time we added a field. I felt this would get to be 
too complicated and annoying to deal with so I flattened the OffsetSum struct 
out to just include each base field directly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r636557694



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -411,19 +442,37 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
 }
 }
 }
+maybeCleanEmptyNamedTopologyDirs();
+}
+
+private void maybeCleanEmptyNamedTopologyDirs() {

Review comment:
   Yeah, we could but I don't feel there is anything to gain, and the code 
would only be less easy to follow imo 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10609: KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10609:
URL: https://github.com/apache/kafka/pull/10609#discussion_r636557299



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -462,39 +512,49 @@ private void cleanRemovedTasksCalledByUser() throws 
Exception {
  * List all of the task directories that are non-empty
  * @return The list of all the non-empty local directories for stream tasks
  */
-File[] listNonEmptyTaskDirectories() {
-final File[] taskDirectories;
-if (!hasPersistentStores || !stateDir.exists()) {
-taskDirectories = new File[0];
-} else {
-taskDirectories =
-stateDir.listFiles(pathname -> {
-if (!pathname.isDirectory() || 
!TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()) {
-return false;
-} else {
-return !taskDirIsEmpty(pathname);
-}
-});
-}
-
-return taskDirectories == null ? new File[0] : taskDirectories;
+List listNonEmptyTaskDirectories() {
+return listTaskDirectories(pathname -> {
+if (!pathname.isDirectory() || 
!TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()) {
+return false;
+} else {
+return !taskDirIsEmpty(pathname);
+}
+});
 }
 
 /**
- * List all of the task directories
+ * List all of the task directories along with their parent directory if 
they belong to a named topology
  * @return The list of all the existing local directories for stream tasks
  */
-File[] listAllTaskDirectories() {
-final File[] taskDirectories;
-if (!hasPersistentStores || !stateDir.exists()) {
-taskDirectories = new File[0];
-} else {
-taskDirectories =
-stateDir.listFiles(pathname -> pathname.isDirectory()
-   && 
TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches());
+List listAllTaskDirectories() {
+return listTaskDirectories(pathname -> pathname.isDirectory() && 
TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches());
+}
+
+private List listTaskDirectories(final FileFilter filter) {
+final List taskDirectories = new ArrayList<>();
+if (hasPersistentStores && stateDir.exists()) {
+if (hasNamedTopologies) {

Review comment:
   No, that should not be allowed.  We have checks to verify this in a few 
places where it matters, but it's an assumption we can make here. I'm not sure 
if your question was from an implementation point of view or a semantic one, 
but I can further clarify or justify why it should not be allowed if you want




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10740: Kafka 8613 kip 633 drop default grace period streams

2021-05-20 Thread GitBox


ableegoldman commented on a change in pull request #10740:
URL: https://github.com/apache/kafka/pull/10740#discussion_r636513301



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -67,6 +68,7 @@
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, StreamJoined)
  * @see TimestampExtractor
  */
+@SuppressWarnings("deprecation")

Review comment:
   Why do we need this?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -168,6 +172,50 @@ public JoinWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumentEx
 return new JoinWindows(beforeMs, afterMs, afterWindowEndMs);
 }
 
+
+/**
+ * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream.
+ *
+ * @param timeDifference join window interval
+ * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+ * @since 3.0
+ */
+public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
+return ofTimeDifferenceAndGrace(timeDifference, 
ofMillis(DEFAULT_GRACE_PERIOD_MS));

Review comment:
   I think it would be more clear to specify this as follows. The whole 
point of this KIP is to get rid of the concept of a "default grace period" 
   ```suggestion
   return ofTimeDifferenceAndGrace(timeDifference, ofMillis(0));
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##
@@ -69,6 +70,7 @@
  * @see KGroupedStream#windowedBy(SessionWindows)
  * @see TimestampExtractor
  */
+@SuppressWarnings("deprecation")

Review comment:
   Same here, I don't think we should need to suppress any deprecation 
warnings in this class as we aren't actually using any of the deprecated 
methods here, right? (And if we are, it's better to suppress at the smallest 
scope possible, ie only to specific individual methods rather than the whole 
class)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -168,6 +172,50 @@ public JoinWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumentEx
 return new JoinWindows(beforeMs, afterMs, afterWindowEndMs);
 }
 
+
+/**
+ * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+ * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+ * the timestamp of the record from the primary stream.
+ *
+ * @param timeDifference join window interval
+ * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+ * @since 3.0

Review comment:
   nit: delete this line (here and elsewhere in the PR), we don't need this 
for new APIs in Kafka (only for deprecated APIs we do the `@deprecated since 
x.y` thing)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##
@@ -84,17 +86,14 @@ private SessionWindows(final long gapMs, final long 
graceMs) {
  * Create a new window specification with the specified inactivity gap.
  *
  * @param inactivityGap the gap of inactivity between sessions
- * @return a new window specification with default maintain duration of 1 
day
+ * @return a new window specification with default without any grace period

Review comment:
   I might have forgotten to specify this in the KIP, but we should make 
sure not to change the behavior of these old deprecated constructors. Maybe we 
can phrase this as something like:
   ```suggestion
* @return a new window specification without specifying a grace period 
(uses the old default grace period of 24hr)
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
##
@@ -119,6 +120,50 @@ public SessionWindows grace(final Duration afterWindowEnd) 
throws IllegalArgumen
 return new SessionWindows(gapMs, afterWindowEndMs);
 }
 
+/**
+ * Create a new window specification with the specified inactivity gap.
+ *
+ * @param inactivityGap the gap of inactivity between sessions
+ * @return a new window specification without any grace period
+ *
+ * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
+ * @since 3.0
+ */
+public static SessionWindows ofInactivityGapWithNoGrace(final Duration 
inactivityGap) {
+return ofInactivityGapAndGrace(inactivityGap, 

[GitHub] [kafka] vahidhashemian commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-20 Thread GitBox


vahidhashemian commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-84299


   Thanks for tagging me @guozhangwang @ableegoldman.
   I'll try to review this within the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-20 Thread GitBox


ableegoldman commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-845549567


   It's on my list to review in the next couple of weeks, if not sooner. Sorry 
I have not had time to get to this one yet, but I will   (and I agree we 
should also get feedback from @vahidhashemian if he sees this)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-20 Thread GitBox


cmccabe merged pull request #10572:
URL: https://github.com/apache/kafka/pull/10572


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-20 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r636538620



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -387,6 +398,12 @@ public void replay(PartitionChangeRecord record) {
 String topicPart = topicInfo.name + "-" + record.partitionId() + " 
with topic ID " +
 record.topicId();
 newPartitionInfo.maybeLogPartitionChange(log, topicPart, 
prevPartitionInfo);
+if ((newPartitionInfo.leader != newPartitionInfo.preferredReplica()) 
&& 

Review comment:
   Can do this in a follow-on




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-20 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r636531647



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
##
@@ -48,7 +53,6 @@ public void updateEventQueueProcessingTime(long durationMs) {
 // nothing to do
 }
 
-@Override

Review comment:
   Weird, did this `@Override` line get deleted by accident?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-20 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r636531647



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
##
@@ -48,7 +53,6 @@ public void updateEventQueueProcessingTime(long durationMs) {
 // nothing to do
 }
 
-@Override

Review comment:
   Weird, did this "@Override" line get deleted by accident?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-20 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r636531195



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##
@@ -252,6 +252,73 @@ public void testGlobalTopicAndPartitionMetrics() throws 
Exception {
 assertEquals(0, ctx.metrics.globalPartitionCount());
 }
 
+@Test
+public void testOfflinePartitionAndReplicaImbalanceMetrics() throws 
Exception {
+ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+ReplicationControlManager replicationControl = ctx.replicationControl;
+
+for (int i = 0; i < 4; i++) {
+registerBroker(i, ctx);
+unfenceBroker(i, ctx);
+}
+
+CreatableTopicResult foo = ctx.createTestTopic("foo", 

Review comment:
   This is definitely a nitpick, but can you put this one fewer lines? 2 
lines should be enough for this. Same for the ones below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-20 Thread GitBox


cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-845535131


   The bug (which could be a test bug?) seems intermittent and didn't re-occur 
when I ran the test locally. In case Jenkins purges the information before you 
get a chance to look, here is the failing test and error message:
   
   Build / JDK 11 and Scala 2.13 / 
kafka.coordinator.transaction.ProducerIdsIntegrationTest.[2] Type=ZK, 
Name=testUniqueProducerIds, IBP=3.0-IV0, Security=PLAINTEXT
   
   org.opentest4j.AssertionFailedError: Expected to see 4000 in 0, 1, 2, 3, 4, 
5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 
26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 
66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 
86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 
104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 
120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 
136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 
152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 
168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 
184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 
200, 201, 202, 203, 204, 205, 206, 207, 208,
  209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 
224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 
240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 
256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 
272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 
288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 
304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 
320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 
336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 
352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 
368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 
384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 
400, 401, 402, 403, 404, 405, 406, 407, 4
 08, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 
424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 
440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 
456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 
472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 
488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 
504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 
520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 
536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 
552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 
568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 
584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599, 
600, 601, 602, 603, 604, 605, 606, 607
 , 608, 609, 610, 611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 
623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636, 637, 638, 
639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649, 650, 651, 652, 653, 654, 
655, 656, 657, 658, 659, 660, 661, 662, 663, 664, 665, 666, 667, 668, 669, 670, 
671, 672, 673, 674, 675, 676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 
687, 688, 689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 
703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714, 715, 716, 717, 718, 
719, 720, 721, 722, 723, 724, 725, 726, 727, 728, 729, 730, 731, 732, 733, 734, 
735, 736, 737, 738, 739, 740, 741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 
751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 
767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 
783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 
799, 800, 801, 802, 803, 804, 805, 806, 
 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 
822, 823, 824, 825, 826, 827, 828, 829, 830, 831, 832, 833, 834, 835, 836, 837, 
838, 839, 840, 841, 842, 843, 844, 845, 846, 847, 848, 849, 850, 851, 852, 853, 
854, 855, 856, 857, 858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 
870, 871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885, 
886, 887, 888, 889, 890, 891, 892, 

[GitHub] [kafka] cmccabe commented on a change in pull request #10572: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller

2021-05-20 Thread GitBox


cmccabe commented on a change in pull request #10572:
URL: https://github.com/apache/kafka/pull/10572#discussion_r636526690



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -387,6 +398,12 @@ public void replay(PartitionChangeRecord record) {
 String topicPart = topicInfo.name + "-" + record.partitionId() + " 
with topic ID " +
 record.topicId();
 newPartitionInfo.maybeLogPartitionChange(log, topicPart, 
prevPartitionInfo);
+if ((newPartitionInfo.leader != newPartitionInfo.preferredReplica()) 
&& 

Review comment:
   Can we have a function like "hasPreferredLeader" on 
`PartitionControlInfo`, to make this simpler?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-20 Thread GitBox


cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-845532005


   I'm OK with moving the condition variable refactor to a follow-on PR. 
Similarly, I see that you don't have any logger in 
ZkClusterInvocationContext.java, so we can hold off on that for now.
   
   However, it looks like 
`kafka.coordinator.transaction.ProducerIdsIntegrationTest` is failing here. 
@mumrah can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12743) [Kafka Streams] - cluster failover for stateful Kafka Streams applications

2021-05-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348830#comment-17348830
 ] 

Guozhang Wang commented on KAFKA-12743:
---

For example, there are a few open source projects like Uber's uReplicator 
(2.0), and also commercial products like Confluent's Replicator. AFAIK some of 
these projects do not enforce adding prefix to the destination cluster's topics.

> [Kafka Streams] - cluster failover for stateful Kafka Streams applications
> --
>
> Key: KAFKA-12743
> URL: https://issues.apache.org/jira/browse/KAFKA-12743
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker, streams
>Affects Versions: 2.8.0
>Reporter: Sergey Zyrianov
>Priority: Major
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name : 
> _app_id-storename-changelog_
> {noformat}
> public static String storeChangelogTopic(String applicationId, String 
> storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
>  
> MirrorMaker2(mm2) copies these topics to remote cluster under the name  
> _src-cluster-alias.app_id-storename-changelog_
>  
> When streams app fails over to the remote cluster it has troubles to find 
> changelog topic of its state store since it was renamed - given source 
> cluster prefix by mm2.
> Whats the fix should be ? instruct mm2 to keep topic name or subscribe to 
> regex *._app_id-storename-changelog_ topic name for the state's changelog.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-20 Thread GitBox


cmccabe merged pull request #10705:
URL: https://github.com/apache/kafka/pull/10705


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12821) Kafka consumer skip reading from single partition

2021-05-20 Thread Prakash Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prakash Patel updated KAFKA-12821:
--
Description: 
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.



Each consumer threads is assigned to 5+ topics/partitions. No of incoming 
events to partition varies a lot. 

 
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
intermittently Kafka consumer stops reading from single partition although its 
subscribed and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
client side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 

  was:
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
intermittently Kafka consumer stops reading from single partition although its 
subscribed and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
client side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 


> Kafka consumer skip reading from single partition 
> --
>
> Key: KAFKA-12821
> URL: https://issues.apache.org/jira/browse/KAFKA-12821
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Prakash Patel
>Priority: Major
>
> I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
> events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
> group subscribed to 100+ topics and consumes events from 500+ partitions. 
> Below are the Kafka consumers configuration.I am running 10 instances of 
> logstash on kubernetes so total 100 consumer threads consuming from 500 
> partitions.
> Each consumer threads is assigned to 5+ topics/partitions. No of incoming 
> events to partition varies a lot. 
>  
> {quote}auto_offset_reset => "earliest"
>  enable_auto_commit => "true"
>  group_id => "test_logstash"
>  consumer_threads => "10"
>  max_poll_records => "500"
>  heartbeat_interval_ms => "9000"
>  session_timeout_ms => "3"
>  fetch_max_bytes => "10485760"
>  max_partition_fetch_bytes => "524288"
>  client_id => "test_logstash"
>  decorate_events => true
>  partition_assignment_strategy => 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor"
> {quote}
> intermittently Kafka consumer stops reading from single partition although 
> its subscribed and assigned to consumer group.
> There is no rebalance or any error or info message Kafka consumer throws on 
> client side. 
> Restating the consumer solve the problem. This is intermittent issue and it 
> can happen with any partition of any topic consumer is subscribed/assigned.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12821) Kafka consumer skip reading from single partition

2021-05-20 Thread Prakash Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prakash Patel updated KAFKA-12821:
--
Description: 
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
intermittently Kafka consumer stops reading from single partition although its 
subscribed and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
client side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 

  was:
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
intermittently Kafka consumer stops reading data from single partition although 
its subscribed and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
application side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 


> Kafka consumer skip reading from single partition 
> --
>
> Key: KAFKA-12821
> URL: https://issues.apache.org/jira/browse/KAFKA-12821
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Prakash Patel
>Priority: Major
>
> I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
> events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
> group subscribed to 100+ topics and consumes events from 500+ partitions. 
> Below are the Kafka consumers configuration.I am running 10 instances of 
> logstash on kubernetes so total 100 consumer threads consuming from 500 
> partitions.
> {quote}auto_offset_reset => "earliest"
>  enable_auto_commit => "true"
>  group_id => "test_logstash"
>  consumer_threads => "10"
>  max_poll_records => "500"
>  heartbeat_interval_ms => "9000"
>  session_timeout_ms => "3"
>  fetch_max_bytes => "10485760"
>  max_partition_fetch_bytes => "524288"
>  client_id => "test_logstash"
>  decorate_events => true
>  partition_assignment_strategy => 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor"
> {quote}
> intermittently Kafka consumer stops reading from single partition although 
> its subscribed and assigned to consumer group.
> There is no rebalance or any error or info message Kafka consumer throws on 
> client side. 
> Restating the consumer solve the problem. This is intermittent issue and it 
> can happen with any partition of any topic consumer is subscribed/assigned.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12821) Kafka consumer skip reading from single partition

2021-05-20 Thread Prakash Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prakash Patel updated KAFKA-12821:
--
Description: 
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
intermittently Kafka consumer stops reading data from single partition although 
its subscribed and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
application side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 

  was:
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
Kafka consumer stops reading data from single partition although its subscribed 
and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
application side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 


> Kafka consumer skip reading from single partition 
> --
>
> Key: KAFKA-12821
> URL: https://issues.apache.org/jira/browse/KAFKA-12821
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Prakash Patel
>Priority: Major
>
> I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
> events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
> group subscribed to 100+ topics and consumes events from 500+ partitions. 
> Below are the Kafka consumers configuration.I am running 10 instances of 
> logstash on kubernetes so total 100 consumer threads consuming from 500 
> partitions.
> {quote}auto_offset_reset => "earliest"
>  enable_auto_commit => "true"
>  group_id => "test_logstash"
>  consumer_threads => "10"
>  max_poll_records => "500"
>  heartbeat_interval_ms => "9000"
>  session_timeout_ms => "3"
>  fetch_max_bytes => "10485760"
>  max_partition_fetch_bytes => "524288"
>  client_id => "test_logstash"
>  decorate_events => true
>  partition_assignment_strategy => 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor"
> {quote}
> intermittently Kafka consumer stops reading data from single partition 
> although its subscribed and assigned to consumer group.
> There is no rebalance or any error or info message Kafka consumer throws on 
> application side. 
> Restating the consumer solve the problem. This is intermittent issue and it 
> can happen with any partition of any topic consumer is subscribed/assigned.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12821) Kafka consumer skip reading from single partition

2021-05-20 Thread Prakash Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prakash Patel updated KAFKA-12821:
--
Description: 
I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
group subscribed to 100+ topics and consumes events from 500+ partitions. Below 
are the Kafka consumers configuration.I am running 10 instances of logstash on 
kubernetes so total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
 enable_auto_commit => "true"
 group_id => "test_logstash"
 consumer_threads => "10"
 max_poll_records => "500"
 heartbeat_interval_ms => "9000"
 session_timeout_ms => "3"
 fetch_max_bytes => "10485760"
 max_partition_fetch_bytes => "524288"
 client_id => "test_logstash"
 decorate_events => true
 partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
Kafka consumer stops reading data from single partition although its subscribed 
and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
application side. 

Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 

  was:
I have logstash running on Kubernetes and consuming events from 100+ 
topics.Each topic have 3+ partitions. Thus Single consumer group subscribed to 
100+ topics and consumes events from 500+ partitions. Below are the Kafka 
consumers configuration.I am running 10 instances of logstash on kubernetes so 
total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
enable_auto_commit => "true"
group_id => "test_logstash"
consumer_threads => "10"
max_poll_records => "500"
heartbeat_interval_ms => "9000"
session_timeout_ms => "3"
fetch_max_bytes => "10485760"
max_partition_fetch_bytes => "524288"
client_id => "test_logstash"
decorate_events => true
partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
Kafka consumer stops reading data from single partition although its subscribed 
and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
application side. 



Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 


> Kafka consumer skip reading from single partition 
> --
>
> Key: KAFKA-12821
> URL: https://issues.apache.org/jira/browse/KAFKA-12821
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: Prakash Patel
>Priority: Major
>
> I have logstash ( Java Kafka Consumer ) running on Kubernetes and consuming 
> events from 100+ topics.Each topic have 3+ partitions. Thus Single consumer 
> group subscribed to 100+ topics and consumes events from 500+ partitions. 
> Below are the Kafka consumers configuration.I am running 10 instances of 
> logstash on kubernetes so total 100 consumer threads consuming from 500 
> partitions.
> {quote}auto_offset_reset => "earliest"
>  enable_auto_commit => "true"
>  group_id => "test_logstash"
>  consumer_threads => "10"
>  max_poll_records => "500"
>  heartbeat_interval_ms => "9000"
>  session_timeout_ms => "3"
>  fetch_max_bytes => "10485760"
>  max_partition_fetch_bytes => "524288"
>  client_id => "test_logstash"
>  decorate_events => true
>  partition_assignment_strategy => 
> "org.apache.kafka.clients.consumer.RoundRobinAssignor"
> {quote}
> Kafka consumer stops reading data from single partition although its 
> subscribed and assigned to consumer group.
> There is no rebalance or any error or info message Kafka consumer throws on 
> application side. 
> Restating the consumer solve the problem. This is intermittent issue and it 
> can happen with any partition of any topic consumer is subscribed/assigned.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12821) Kafka consumer skip reading from single partition

2021-05-20 Thread Prakash Patel (Jira)
Prakash Patel created KAFKA-12821:
-

 Summary: Kafka consumer skip reading from single partition 
 Key: KAFKA-12821
 URL: https://issues.apache.org/jira/browse/KAFKA-12821
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.3.0
Reporter: Prakash Patel


I have logstash running on Kubernetes and consuming events from 100+ 
topics.Each topic have 3+ partitions. Thus Single consumer group subscribed to 
100+ topics and consumes events from 500+ partitions. Below are the Kafka 
consumers configuration.I am running 10 instances of logstash on kubernetes so 
total 100 consumer threads consuming from 500 partitions.
{quote}auto_offset_reset => "earliest"
enable_auto_commit => "true"
group_id => "test_logstash"
consumer_threads => "10"
max_poll_records => "500"
heartbeat_interval_ms => "9000"
session_timeout_ms => "3"
fetch_max_bytes => "10485760"
max_partition_fetch_bytes => "524288"
client_id => "test_logstash"
decorate_events => true
partition_assignment_strategy => 
"org.apache.kafka.clients.consumer.RoundRobinAssignor"
{quote}
Kafka consumer stops reading data from single partition although its subscribed 
and assigned to consumer group.

There is no rebalance or any error or info message Kafka consumer throws on 
application side. 



Restating the consumer solve the problem. This is intermittent issue and it can 
happen with any partition of any topic consumer is subscribed/assigned.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10740: Kafka 8613 kip 633 drop default grace period streams

2021-05-20 Thread GitBox


ableegoldman commented on pull request #10740:
URL: https://github.com/apache/kafka/pull/10740#issuecomment-845517863


   Hey @izzyacademy , I'm taking a look but just a quick heads up: the build 
failed to compile, looks like the problem is some of the demo classes use one 
of the now-deprecated methods and need to be migrated to the new API: 
`TemperatureDemo`, `PageViewTypedDemo`, and `PageViewUntypedDemo`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #10648: KAFKA-9726: Add IdentityReplicationPolicy for MM2

2021-05-20 Thread GitBox


ryannedolan commented on pull request #10648:
URL: https://github.com/apache/kafka/pull/10648#issuecomment-845517243


   I still don't think an API change is required here. I was able to get your 
integration tests passing with this version of IdentityReplicationPolicy: 
https://github.com/apache/kafka/pull/10652/files#diff-79a09517576a35906123533490ed39c0e1a9416878e284d7b71f5f4c53eeca29R31
   
   I just had to add this extra check to MirrorSourceConnector:
   
   
https://github.com/apache/kafka/pull/10652/files#diff-b7d6db5cc72b500fab6c628f42376198eaecfd6258069bbff0e2ec98ee9e9427R497
   
   which I think is probably harmless. Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12779) TaskMetadata should return actual TaskId rather than plain String

2021-05-20 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12779.

Resolution: Fixed

> TaskMetadata should return actual TaskId rather than plain String
> -
>
> Key: KAFKA-12779
> URL: https://issues.apache.org/jira/browse/KAFKA-12779
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Not sure why this was encoded as a String field instead of using the public 
> TaskId class. We should use an actual TaskId object, especially as we may add 
> additional fields that increase the complexity and parsing of the taskId.
> [KIP-740: Use TaskId instead of String for the taskId field in 
> TaskMetadata|https://cwiki.apache.org/confluence/display/KAFKA/KIP-740%3A+Use+TaskId+instead+of+String+for+the+taskId+field+in+TaskMetadata]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman merged pull request #10735: KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId()

2021-05-20 Thread GitBox


ableegoldman merged pull request #10735:
URL: https://github.com/apache/kafka/pull/10735


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on pull request #10740: Kafka 8613 kip 633 drop default grace period streams

2021-05-20 Thread GitBox


izzyacademy commented on pull request #10740:
URL: https://github.com/apache/kafka/pull/10740#issuecomment-845505719


   Thanks @mjsax and @ableegoldman for all your help and support on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10735: KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId()

2021-05-20 Thread GitBox


ableegoldman commented on pull request #10735:
URL: https://github.com/apache/kafka/pull/10735#issuecomment-845505488


   Just some unrelated failures in 
`MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete()`, 
merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik commented on pull request #10742:
URL: https://github.com/apache/kafka/pull/10742#issuecomment-845503390


   cc @junrao for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik opened a new pull request #10742: MINOR: Add log identifier/prefix printing in Log layer static functions

2021-05-20 Thread GitBox


kowshik opened a new pull request #10742:
URL: https://github.com/apache/kafka/pull/10742


   When https://github.com/apache/kafka/pull/10478 was merged, we accidentally 
lost the identifier/prefix that we used to previously log from some of the 
functions in the `Log` class using the `Logging` APIs. In this PR, I have 
reinstated the identifier/prefix logging in these functions, so that the 
debuggability is restored.
   
   **Tests:**
   Ran existing unit tests and checked the output. Noticed that the log 
identifier/prefix shows up from the lines wherever it is additionally logged 
from now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy opened a new pull request #10741: [KAFKA-12644] Add Missing Class-Level Javadoc to Exception Classes

2021-05-20 Thread GitBox


izzyacademy opened a new pull request #10741:
URL: https://github.com/apache/kafka/pull/10741


   …ache.kafka.common.errors.ApiException
   
   Added missing class-level javadocs to Exception classes
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10737: KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs

2021-05-20 Thread GitBox


jlprat commented on pull request #10737:
URL: https://github.com/apache/kafka/pull/10737#issuecomment-845501514


   Thanks for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10737: KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs

2021-05-20 Thread GitBox


mjsax commented on pull request #10737:
URL: https://github.com/apache/kafka/pull/10737#issuecomment-845492098


   Thanks for the PR @jlprat! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10737: KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs

2021-05-20 Thread GitBox


mjsax merged pull request #10737:
URL: https://github.com/apache/kafka/pull/10737


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-20 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348804#comment-17348804
 ] 

Gunnar Morling commented on KAFKA-12801:


Ok, so I've added the two JFR recordings to my example repository 
(https://github.com/gunnarmorling/debezium-examples/commit/e6ce9f2db398ee042e9cc0e611310eeddf3c9427),
 which also contains the set-up for reproducing this.

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-20 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348802#comment-17348802
 ] 

Gunnar Morling commented on KAFKA-12801:


Hum, so even with zipping (which brings the size down to 1.6 MB), I'm receiving 
this error upon upload:

{quote}
File "jfr.zip" was not uploaded
An internal error has occurred. Please contact your administrator
{quote}

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12743) [Kafka Streams] - cluster failover for stateful Kafka Streams applications

2021-05-20 Thread Sergey Zyrianov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348780#comment-17348780
 ] 

Sergey Zyrianov commented on KAFKA-12743:
-

[~guozhang] could you expand on which "other mechanisms" there are to replicate 
states changelog? 

> [Kafka Streams] - cluster failover for stateful Kafka Streams applications
> --
>
> Key: KAFKA-12743
> URL: https://issues.apache.org/jira/browse/KAFKA-12743
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker, streams
>Affects Versions: 2.8.0
>Reporter: Sergey Zyrianov
>Priority: Major
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name : 
> _app_id-storename-changelog_
> {noformat}
> public static String storeChangelogTopic(String applicationId, String 
> storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
>  
> MirrorMaker2(mm2) copies these topics to remote cluster under the name  
> _src-cluster-alias.app_id-storename-changelog_
>  
> When streams app fails over to the remote cluster it has troubles to find 
> changelog topic of its state store since it was renamed - given source 
> cluster prefix by mm2.
> Whats the fix should be ? instruct mm2 to keep topic name or subscribe to 
> regex *._app_id-storename-changelog_ topic name for the state's changelog.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2021-05-20 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348711#comment-17348711
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

nudge

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9071) transactional.id.expiration.ms config value should be implemented as a Long

2021-05-20 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348710#comment-17348710
 ] 

Joost van de Wijgerd commented on KAFKA-9071:
-

nudge

> transactional.id.expiration.ms config value should be implemented as a Long
> ---
>
> Key: KAFKA-9071
> URL: https://issues.apache.org/jira/browse/KAFKA-9071
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 2.3.0
>Reporter: Joost van de Wijgerd
>Assignee: Mario Georgiev
>Priority: Major
>
> Currently the value of this config parameter is limited to MAX_INT 
> effectively limiting the transactional id expiration to  ~ 25 days. This is 
> causing some issues for us on our Acceptance environment (which is not used 
> that often / heavily) where our transactional services will start failing 
> because if this issue.
> I believe best practice for millisecond values should be to implement them as 
> a Long and not as an Integer
> this is currently the max value: transactional.id.expiration.ms=2147483647
> while I would like to set it to: transactional.id.expiration.ms=3154000 
> (i.e. 1 year)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-20 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r636347212



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -56,24 +57,17 @@
 void handleSnapshot(SnapshotReader reader);
 
 /**

Review comment:
   Yes. I added a few more sentences to make this clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10705: KAFKA-12342: Reverse module dependency between Raft and Metadata

2021-05-20 Thread GitBox


jsancio commented on a change in pull request #10705:
URL: https://github.com/apache/kafka/pull/10705#discussion_r636340373



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -624,50 +617,71 @@ public String toString() {
 return event.future();
 }
 
-class QuorumMetaLogListener implements MetaLogListener {
+class QuorumMetaLogListener implements 
RaftClient.Listener {
+
 @Override
-public void handleCommits(long offset, List messages) {
-appendControlEvent("handleCommits[" + offset + "]", () -> {
-if (curClaimEpoch == -1) {
-// If the controller is a standby, replay the records that 
were
-// created by the active controller.
-if (log.isDebugEnabled()) {
-if (log.isTraceEnabled()) {
-log.trace("Replaying commits from the active node 
up to " +
-"offset {}: {}.", offset, messages.stream().
-map(m -> 
m.toString()).collect(Collectors.joining(", ")));
+public void handleCommit(BatchReader reader) {
+appendControlEvent("handleCommits[baseOffset=" + 
reader.baseOffset() + "]", () -> {
+try {
+boolean isActiveController = curClaimEpoch != -1;
+while (reader.hasNext()) {
+Batch batch = reader.next();
+long offset = batch.lastOffset();
+List messages = batch.records();
+
+if (isActiveController) {
+// If the controller is active, the records were 
already replayed,
+// so we don't need to do it here.
+log.debug("Completing purgatory items up to offset 
{}.", offset);
+
+// Complete any events in the purgatory that were 
waiting for this offset.
+purgatory.completeUpTo(offset);
+
+// Delete all the in-memory snapshots that we no 
longer need.
+// If we are writing a new snapshot, then we need 
to keep that around;
+// otherwise, we should delete up to the current 
committed offset.
+snapshotRegistry.deleteSnapshotsUpTo(
+Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+
 } else {
-log.debug("Replaying commits from the active node 
up to " +
-"offset {}.", offset);
+// If the controller is a standby, replay the 
records that were
+// created by the active controller.
+if (log.isDebugEnabled()) {
+if (log.isTraceEnabled()) {
+log.trace("Replaying commits from the 
active node up to " +
+"offset {}: {}.", offset, 
messages.stream()
+.map(ApiMessageAndVersion::toString)
+.collect(Collectors.joining(", ")));
+} else {
+log.debug("Replaying commits from the 
active node up to " +
+"offset {}.", offset);
+}
+}
+for (ApiMessageAndVersion messageAndVersion : 
messages) {
+replay(messageAndVersion.message(), -1, 
offset);
+}
 }
+lastCommittedOffset = offset;
 }
-for (ApiMessage message : messages) {
-replay(message, -1, offset);
-}
-} else {
-// If the controller is active, the records were already 
replayed,
-// so we don't need to do it here.
-log.debug("Completing purgatory items up to offset {}.", 
offset);
-
-// Complete any events in the purgatory that were waiting 
for this offset.
-purgatory.completeUpTo(offset);
-
-// Delete all the in-memory snapshots that we no longer 
need.
-// If we are writing a new snapshot, then we need to keep 
that around;
-// otherwise, we should delete up to the current committed 
offset.
-snapshotRegistry.deleteSnapshotsUpTo(
-Math.min(offset, 
snapshotGeneratorManager.snapshotEpoch()));
+} finally {
+reader.close();
 }
-   

[jira] [Commented] (KAFKA-12815) KTable.transformValue might have incorrect record metadata

2021-05-20 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348646#comment-17348646
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12815:


Ack, sorry I made a bad assumption about how FKJ are implemented internally. I 
wasn't the one to review them

Given the limited scope of this bug I would say it feels appropriate to just 
fix in trunk for now, and file a followup ticket pointing out this bug/as an 
improvement to tighten up the processor context contract w.r.t topic() and 
offset(). If there are users currently out there for whom this is an essential 
feature, it will be pretty obvious once their apps start failing with an NPE, 
and they will probably let us know. We can re-evaluate whether to do a "full 
fix" if/when user reports come in

> KTable.transformValue might have incorrect record metadata
> --
>
> Key: KAFKA-12815
> URL: https://issues.apache.org/jira/browse/KAFKA-12815
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 3.0.0
>
>
> In the DSL, Kafka Streams applies an optimization for non-materialized 
> tables: when these are queried an upstream state store is accessed. To ensure 
> that the correct value is returned from the lookup, all intermediate 
> processors after the materialized store, and before the processor that 
> triggers the lookup are re-applied (cf `KTableValueGetter`).
> For re-applying DSL operators like filter/mapValues that works fine. However, 
> for transformValue(), the method is executed with the incorrect 
> `RecordContext` (note that DSL operators like filter don't have access to the 
> `RecordContext` and thus, are not subject to this bug). Instead of using the 
> record context from the value that was received from the upstream state store 
> (and that is re-processed), the transformer would see the context from the 
> record that triggered the lookup.
> Thus, the information about timestamp, offset, partition, topic name, and 
> headers is incorrect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10733: KAFKA-12816 Added tiered storage related configs including remote log manager configs.

2021-05-20 Thread GitBox


satishd commented on a change in pull request #10733:
URL: https://github.com/apache/kafka/pull/10733#discussion_r636298299



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+/**
+ * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+ * this prefix and passed to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+ */
+public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = 
"remote.log.storage.manager.impl.";

Review comment:
   I will update KIP about the prefixes for both `RemoteStorageManager` and 
`RemoteLogMetadataManager` properties. 

##
File path: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
##
@@ -75,6 +75,20 @@
 "\"delete\" retention policy. This represents an SLA on how soon 
consumers must read " +
 "their data. If set to -1, no time limit is applied.";
 
+public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = 
"remote.storage.enable";
+public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tier 
storage for a topic, set `remote.storage.enable` as true. " +
+"You can not disable this config once it is enabled. It will be 
provided in future versions.";
+
+public static final String LOCAL_LOG_RETENTION_MS_CONFIG = 
"local.retention.ms";

Review comment:
   I will update KIP-405 with `local.retention.bytes` and 
`local.retention.ms` properties. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #10482: KAFKA-12499: add transaction timeout verification

2021-05-20 Thread GitBox


abbccdda commented on pull request #10482:
URL: https://github.com/apache/kafka/pull/10482#issuecomment-845296716


   Only unrelated test fails, merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12801) High CPU load after restarting brokers subsequent to quorum loss

2021-05-20 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348632#comment-17348632
 ] 

Colin McCabe commented on KAFKA-12801:
--

[~gunnar.morling]: thanks for the bug report. Maybe try using 'zip' on the JFR 
files before attaching them to JIRA?

If that doesn't work, maybe try using the UNIX split command if you're hitting 
a file size limit? I don't remember what the file size limit is for this JIRA. 
I thought it was bigger than 5.2MB, though...

> High CPU load after restarting brokers subsequent to quorum loss
> 
>
> Key: KAFKA-12801
> URL: https://issues.apache.org/jira/browse/KAFKA-12801
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Gunnar Morling
>Priority: Major
>
> I'm testing Kafka in the new KRaft mode added in 2.8. I have a cluster of 
> three Kafka nodes (all combined nodes), and one Kafka Connect node. After 
> starting all components, I first stop the current controller of the Kafka 
> cluster, then I stop the then controller of the Kafka cluster. At this point, 
> only one Kafka node out of the original three and Connect is running.
> When now restarting the two stopped Kafka nodes, CPU load on the Connect node 
> and the two broker nodes goes up to 100% and remains at that level for an 
> indefinite amount of time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9296) Correlation id for response () does not match request ()

2021-05-20 Thread Lea (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348627#comment-17348627
 ] 

Lea edited comment on KAFKA-9296 at 5/20/21, 4:57 PM:
--

I had the same problem using kafka producer again.
 Environment: K8S POD

client version: 2.4.1

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread | 
314c35d9-10fd-44e4-9f96-1a60805b250d] ERROR o.a.k.c.producer.internals.Sender 
run 246 - [Producer clientId=314c35d9-10fd-44e4-9f96-1a60805b250d] Uncaught 
error in kafka producer I/O thread: java.lang.IllegalStateException: 
Correlation id for response (80779915) does not match request (80779910), 
request header: RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 


was (Author: leaye):
I had the same problem using kafka producer again.
 Environment: K8S POD

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread | 
314c35d9-10fd-44e4-9f96-1a60805b250d] ERROR o.a.k.c.producer.internals.Sender 
run 246 - [Producer clientId=314c35d9-10fd-44e4-9f96-1a60805b250d] Uncaught 
error in kafka producer I/O thread: java.lang.IllegalStateException: 
Correlation id for response (80779915) does not match request (80779910), 
request header: RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9296) Correlation id for response () does not match request ()

2021-05-20 Thread Lea (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348627#comment-17348627
 ] 

Lea commented on KAFKA-9296:


I had the same problem using kafka producer again.
Environment: K8S POD

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
{code}
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread | 
314c35d9-10fd-44e4-9f96-1a60805b250d] ERROR o.a.k.c.producer.internals.Sender 
run 246 - [Producer clientId=314c35d9-10fd-44e4-9f96-1a60805b250d] Uncaught 
error in kafka producer I/O thread: java.lang.IllegalStateException: 
Correlation id for response (80779915) does not match request (80779910), 
request header: RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9296) Correlation id for response () does not match request ()

2021-05-20 Thread Lea (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348627#comment-17348627
 ] 

Lea edited comment on KAFKA-9296 at 5/20/21, 4:56 PM:
--

I had the same problem using kafka producer again.
 Environment: K8S POD

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread | 
314c35d9-10fd-44e4-9f96-1a60805b250d] ERROR o.a.k.c.producer.internals.Sender 
run 246 - [Producer clientId=314c35d9-10fd-44e4-9f96-1a60805b250d] Uncaught 
error in kafka producer I/O thread: java.lang.IllegalStateException: 
Correlation id for response (80779915) does not match request (80779910), 
request header: RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)
{code}
 


was (Author: leaye):
I had the same problem using kafka producer again.
Environment: K8S POD

At the same time, I find that the TCP retransmission error rate of the node 
increases to 2%. I don't know whether this is related to this.
{code:java}
//代码占位符
{code}
2021-05-19 12:39:34.308 [] [kafka-producer-network-thread | 
314c35d9-10fd-44e4-9f96-1a60805b250d] ERROR o.a.k.c.producer.internals.Sender 
run 246 - [Producer clientId=314c35d9-10fd-44e4-9f96-1a60805b250d] Uncaught 
error in kafka producer I/O thread: java.lang.IllegalStateException: 
Correlation id for response (80779915) does not match request (80779910), 
request header: RequestHeader(apiKey=PRODUCE, apiVersion=5, 
clientId=314c35d9-10fd-44e4-9f96-1a60805b250d, correlationId=80779910) at 
org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:937) at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:720)
 at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:556) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at 
java.lang.Thread.run(Thread.java:748)

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12817) Make Task ID an Implementation Detail

2021-05-20 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348619#comment-17348619
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12817:


Thanks for filing this Bruno. I still have my own personal reservations about 
this, but I'm happy to discuss it further if/when a KIP is raised

> Make Task ID an Implementation Detail
> -
>
> Key: KAFKA-12817
> URL: https://issues.apache.org/jira/browse/KAFKA-12817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: needs-kip
>
> Each task has a task ID that is used to identify tasks within Kafka Streams. 
> The task ID is composed of the subtopology ID it executes and the number of 
> the partitions the task reads its input data from. This naming scheme is 
> rather an implementation detail and it is not something user should need to 
> rely on to get metadata of a task. However, the task ID in this form is used 
> to tag metrics, in log files, and its representation in code, the {{TaskId}} 
> class is part of the public API.
> This ticket proposes to make the task ID really an implementation detail by:
> * removing  {{TaskId}} from the public API
> * use the subtopology ID and the partition numbers in logs and metrics 
> instead of the task ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] izzyacademy opened a new pull request #10740: Kafka 8613 kip 633 drop default grace period streams

2021-05-20 Thread GitBox


izzyacademy opened a new pull request #10740:
URL: https://github.com/apache/kafka/pull/10740


   [KAFKA-8613] Set default grace period to 0
   KIP-633: Drop 24 hour default of grace period in Streams
   
   Added API implementation for specifying grace periods for TimeWindows, 
SessionWindows, JoinWindows and SlidingWindows
   Changed default grace period constant Windows.DEFAULT_GRACE_PERIOD_MS to 0 
milliseconds from 24 hours
   Updated corresponding unit tests 
   Added deprecation suppression annotations for test classes using the newly 
deprecated APIs
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr opened a new pull request #10739: KAFKA-12820: Upgrade maven-artifact dependency to resolve CVE-2021-26291

2021-05-20 Thread GitBox


dongjinleekr opened a new pull request #10739:
URL: https://github.com/apache/kafka/pull/10739


   [CVE-2021-26291](https://nvd.nist.gov/vuln/detail/CVE-2021-26291), which 
makes Man-In-The-Middle-Attack possible, was fixed in maven 
[3.8.1](https://maven.apache.org/docs/3.8.1/release-notes.html).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12820) Upgrade maven-artifact dependency to resolve CVE-2021-26291

2021-05-20 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee reassigned KAFKA-12820:
---

Assignee: Dongjin Lee

> Upgrade maven-artifact dependency to resolve CVE-2021-26291
> ---
>
> Key: KAFKA-12820
> URL: https://issues.apache.org/jira/browse/KAFKA-12820
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Affects Versions: 2.6.1, 2.8.0, 2.7.1
>Reporter: Boojapho
>Assignee: Dongjin Lee
>Priority: Major
>
> Current Gradle builds of Kafka contain a dependency of `maven-artifact` 
> version 3.6.3, which contains CVE-2021-26291 
> ([https://nvd.nist.gov/vuln/detail/CVE-2021-26291).]  This vulnerability has 
> been fixed in Maven 3.8.1 
> ([https://maven.apache.org/docs/3.8.1/release-notes.html]).  Apache Kafka 
> should update `dependencies.gradle` to use the latest `maven-artifact` 
> library to eliminate this vulnerability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348595#comment-17348595
 ] 

Viktor Utkin edited comment on KAFKA-12818 at 5/20/21, 3:46 PM:


Hi [~tombentley], what memory usage diagnostics data do you need?

By the way we did tests 2 days under the load and I showed you the result. It 
wasn't a 5 min tests.


was (Author: vutkin):
Hi [~tombentley], what memory usage diagnostics data do you need?

By the way we did tests 2 days under the load and I showed you the result. It's 
not a 5 min tests.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10732: MINOR: Eliminate redundant functions in LogTest suite

2021-05-20 Thread GitBox


junrao merged pull request #10732:
URL: https://github.com/apache/kafka/pull/10732


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat edited a comment on pull request #10724: KAFKA-12808: Remove Deprecated Methods under StreamsMetrics

2021-05-20 Thread GitBox


jlprat edited a comment on pull request #10724:
URL: https://github.com/apache/kafka/pull/10724#issuecomment-845226161


   Fixed pushed, `@SuppressWarnings("deprecation")` is now removed, Feel free 
to review @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10724: KAFKA-12808: Remove Deprecated Methods under StreamsMetrics

2021-05-20 Thread GitBox


jlprat commented on pull request #10724:
URL: https://github.com/apache/kafka/pull/10724#issuecomment-845226161


   Fixed pushed, `@SuppressWarnings("deprecation")` is now removed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348595#comment-17348595
 ] 

Viktor Utkin commented on KAFKA-12818:
--

Hi [~tombentley], what memory usage diagnostics data do you need?

By the way we did tests 2 days under the load and I showed you the result. It's 
not a 5 min tests.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lizthegrey closed pull request #10326: Avoid newly replicating brokers in RackAwareReplicaSelector

2021-05-20 Thread GitBox


lizthegrey closed pull request #10326:
URL: https://github.com/apache/kafka/pull/10326


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lizthegrey commented on pull request #10326: Avoid newly replicating brokers in RackAwareReplicaSelector

2021-05-20 Thread GitBox


lizthegrey commented on pull request #10326:
URL: https://github.com/apache/kafka/pull/10326#issuecomment-845210871


   As per private email thread, there is now a reproduction case for this and 
it is not present in open source Kafka, so closing this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348590#comment-17348590
 ] 

Tom Bentley commented on KAFKA-12818:
-

Since your limit > request you're going to be at risk of the OOMKiller anyway, 
whether or not there's a memory leak. You were exceeding your request even with 
the FileConfigProvider.

I find it hard to believe the DirectoryConfigProvider itself is responsible for 
somehow using _that much_ (~200m) memory above what we see in the first curve. 
It's normally only used once when the config file is parsed and the values 
substituted. It doesn't watch the files (even if they do change). So I think we 
need better memory usage diagnostics here than just a graph of RSS.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10723: MINOR: Remove unused maxProducerIdExpirationMs parameter in Log constructor

2021-05-20 Thread GitBox


chia7712 merged pull request #10723:
URL: https://github.com/apache/kafka/pull/10723


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348563#comment-17348563
 ] 

Viktor Utkin edited comment on KAFKA-12818 at 5/20/21, 3:03 PM:


Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
 Secret syncs from keyvault every minute in kubernetes by operator 
(https://akv2k8s.io/), but content of it is unchanged for a long time.


was (Author: vutkin):
Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
 Secret syncs from keyvault every minute in kubernetes by operator, but content 
of it is unchanged for a long time.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348563#comment-17348563
 ] 

Viktor Utkin edited comment on KAFKA-12818 at 5/20/21, 2:59 PM:


Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
 Secret syncs from keyvault every minute in kubernetes by operator, but content 
of it is unchanged for a long time.


was (Author: vutkin):
Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
 Secret syncs from keyvault every minute in kuebrnetes, but content of it is 
unchanged for a long time.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348563#comment-17348563
 ] 

Viktor Utkin edited comment on KAFKA-12818 at 5/20/21, 2:59 PM:


Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
 Secret syncs from keyvault every minute in kuebrnetes, but content of it is 
unchanged for a long time.


was (Author: vutkin):
Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
Secret syncs from keyvault every minute in kuebrnetes, but content of it is 
unchanged.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348563#comment-17348563
 ] 

Viktor Utkin edited comment on KAFKA-12818 at 5/20/21, 2:58 PM:


Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

>was the Secret or ConfigMap for the mount being changed?
Secret syncs from keyvault every minute in kuebrnetes, but content of it is 
unchanged.


was (Author: vutkin):
Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10724: KAFKA-12808: Remove Deprecated Methods under StreamsMetrics

2021-05-20 Thread GitBox


jlprat commented on pull request #10724:
URL: https://github.com/apache/kafka/pull/10724#issuecomment-845187840


   You are right, I'll fix those later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10724: KAFKA-12808: Remove Deprecated Methods under StreamsMetrics

2021-05-20 Thread GitBox


cadonna commented on a change in pull request #10724:
URL: https://github.com/apache/kafka/pull/10724#discussion_r636167629



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##
@@ -777,12 +777,12 @@ public void testLatencyMetrics() {
 final String entity = "entity";
 final String operation = "put";
 
-final Sensor sensor1 = 
streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, 
RecordingLevel.DEBUG);

Review comment:
   You can also remove the `@SuppressWarnings("deprecation")` at the top of 
the method.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
##
@@ -797,11 +797,11 @@ public void testThroughputMetrics() {
 final String entity = "entity";
 final String operation = "put";
 
-final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, 
entity, operation, RecordingLevel.DEBUG);

Review comment:
   You can also remove the `@SuppressWarnings("deprecation")` at the top of 
the method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348563#comment-17348563
 ] 

Viktor Utkin commented on KAFKA-12818:
--

Hi [~tombentley], I've used the metric container_memory_usage_bytes for this 
graph, anyway I didn't understand why file config provider works well instead 
of directory.

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9724: MINOR: work in progress for 2.6

2021-05-20 Thread GitBox


ijuma commented on pull request #9724:
URL: https://github.com/apache/kafka/pull/9724#issuecomment-845172790


   @showuon I will close this PR until there is something to be reviewed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #9724: MINOR: work in progress for 2.6

2021-05-20 Thread GitBox


ijuma closed pull request #9724:
URL: https://github.com/apache/kafka/pull/9724


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12820) Upgrade maven-artifact dependency to resolve CVE-2021-26291

2021-05-20 Thread Boojapho (Jira)
Boojapho created KAFKA-12820:


 Summary: Upgrade maven-artifact dependency to resolve 
CVE-2021-26291
 Key: KAFKA-12820
 URL: https://issues.apache.org/jira/browse/KAFKA-12820
 Project: Kafka
  Issue Type: Task
  Components: build
Affects Versions: 2.7.1, 2.8.0, 2.6.1
Reporter: Boojapho


Current Gradle builds of Kafka contain a dependency of `maven-artifact` version 
3.6.3, which contains CVE-2021-26291 
([https://nvd.nist.gov/vuln/detail/CVE-2021-26291).]  This vulnerability has 
been fixed in Maven 3.8.1 
([https://maven.apache.org/docs/3.8.1/release-notes.html]).  Apache Kafka 
should update `dependencies.gradle` to use the latest `maven-artifact` library 
to eliminate this vulnerability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-20 Thread GitBox


mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r636104630



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error 

[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-20 Thread GitBox


mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r636104630



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error 

[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-20 Thread GitBox


mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r636104630



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) 
extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-getNewProducerIdBlock()
-nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: 
Logging): ProducerIdsBlock = {
+// Get or create the existing PID block from ZK and attempt to update it. 
We retry in a loop here since other
+// brokers may be generating PID blocks during a rolling upgrade
 var zkWriteComplete = false
 while (!zkWriteComplete) {
   // refresh current producerId block from zookeeper again
   val (dataOpt, zkVersion) = 
zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
   // generate the new producerId block
-  currentProducerIdBlock = dataOpt match {
+  val newProducerIdBlock = dataOpt match {
 case Some(data) =>
-  val currProducerIdBlock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  debug(s"Read current producerId block $currProducerIdBlock, Zk path 
version $zkVersion")
+  val currProducerIdBlock = 
ProducerIdBlockZNode.parseProducerIdBlockData(data)
+  logger.debug(s"Read current producerId block $currProducerIdBlock, 
Zk path version $zkVersion")
 
-  if (currProducerIdBlock.blockEndId > Long.MaxValue - 
ProducerIdManager.PidBlockSize) {
+  if (currProducerIdBlock.producerIdEnd > Long.MaxValue - 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
 // we have exhausted all producerIds (wow!), treat it as a fatal 
error
-fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.blockEndId})")
+logger.fatal(s"Exhausted all producerIds as the next block's end 
producerId is will has exceeded long type limit (current block end producerId 
is ${currProducerIdBlock.producerIdEnd})")
 throw new KafkaException("Have exhausted all producerIds.")
   }
 
-  ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, 
currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+  new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 
1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
 case None =>
-  debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
-  ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+  logger.debug(s"There is no producerId block yet (Zk path version 
$zkVersion), creating the first block")
+  new ProducerIdsBlock(brokerId, 0L, 
ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
   }
 
-  val newProducerIdBlockData = 
ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+  val newProducerIdBlockData = 
ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
   // try to write the new producerId block into zookeeper
-  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+  val (succeeded, version) = 
zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, 
newProducerIdBlockData, zkVersion, None)
   zkWriteComplete = succeeded
 
-  if (zkWriteComplete)
-info(s"Acquired new producerId block $currentProducerIdBlock by 
writing to Zk with path version $version")
+  if (zkWriteComplete) {
+logger.info(s"Acquired new producerId block $newProducerIdBlock by 
writing to Zk with path version $version")
+return newProducerIdBlock
+  }
 }
+throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: 
String, expectedData: Array[Byte]): (Boolean, Int) = {
-try {
-  val expectedPidBlock = 
ProducerIdManager.parseProducerIdBlockData(expectedData)
-  zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-case (Some(data), zkVersion) =>
-  val currProducerIdBLock = 
ProducerIdManager.parseProducerIdBlockData(data)
-  (currProducerIdBLock == expectedPidBlock, zkVersion)
-case (None, _) => (false, -1)
-  }
-} catch {
-  case e: Exception =>
-warn(s"Error 

[jira] [Commented] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348517#comment-17348517
 ] 

Tom Bentley commented on KAFKA-12818:
-

As you can see from the code, DirectoryConfigProvider doesn't have any state, 
so this can't simply be objects being pinned in memory.

 

How is the memory usage in the graph being calculated (is it process RSS or 
some JVM metric)? Assuming the DirConfigProvider was looking at a mounted 
directory, was the Secret or ConfigMap for the mount being changed?

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi opened a new pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2021-05-20 Thread GitBox


viktorsomogyi opened a new pull request #10738:
URL: https://github.com/apache/kafka/pull/10738


   This PR adds the capability of users to create delegation token to other 
users.
   
   KIP-373: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-373%3A+Allow+users+to+create+delegation+tokens+for+other+users
   
   In this use case, a superuser with username ‘superuser’ wants to run kafka 
clients on behalf of a user 'joe'. The 'superuser' has secure authentication 
credentials (kerberos, SSL, SCRAM) but user 'joe' doesn’t have any. The clients 
are required to run as user 'joe' and authorizations are required to be done as 
user 'joe.' In this case, 'superuser' can get a delegation token for user 
'joe', and use the generated token to run the Kafka clients. This will mimic 
the impersonation functionality. This will help the stream processing 
frameworks/libs (Apache Spark, Storm, Kafka Streams) to run the jobs (Kafka 
clients) as submitted users.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12819) Quality of life improvements for tests

2021-05-20 Thread Matthew de Detrich (Jira)
Matthew de Detrich created KAFKA-12819:
--

 Summary: Quality of life improvements for tests
 Key: KAFKA-12819
 URL: https://issues.apache.org/jira/browse/KAFKA-12819
 Project: Kafka
  Issue Type: Improvement
Reporter: Matthew de Detrich
Assignee: Matthew de Detrich


Minor improvements to various tests, such as using assertObject instead of 
assertEquals (when comparing objects), fill in missing messages in asserts etc 
etc



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Utkin updated KAFKA-12818:
-
Description: 
Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
config provider. We've got an OOM in kubernetes environment. K8s kills 
container when limit reached. At same time we've not get any OOM in Java. Heap 
dump did't show us anything interesting.

JVM config:
{code:java}
 -XX:+HeapDumpOnOutOfMemoryError
 -XX:HeapDumpPath=/tmp/
 -XX:+UseContainerSupport
 -XX:+OptimizeStringConcat
 -XX:MaxRAMPercentage=75.0
 -XX:InitialRAMPercentage=50.0
 -XX:MaxMetaspaceSize=256M
 -XX:MaxDirectMemorySize=256M
 -XX:+UseStringDeduplication
 -XX:+AlwaysActAsServerClassMachine{code}
 
 Kafka Connect config:
{code:java}
"config.providers": "directory"
 "config.providers.directory.class": 
"org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
 
 Kubernetes pod resources limits:
{code:java}
resources:
  requests:
cpu: 1500m
memory: 2Gi
  limits:
cpu: 3000m
memory: 3Gi
{code}
 

doker image used: confluentinc/cp-kafka-connect:6.1.1

  was:
Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
config provider. We've got an OOM in kubernetes environment. K8s kills 
container when limit reached. At same time we've not get any OOM in Java. Heap 
dump did't show us anything interesting.

JVM config:
{code:java}
 -XX:+HeapDumpOnOutOfMemoryError
 -XX:HeapDumpPath=/tmp/
 -XX:+UseContainerSupport
 -XX:+OptimizeStringConcat
 -XX:MaxRAMPercentage=75.0
 -XX:InitialRAMPercentage=50.0
 -XX:MaxMetaspaceSize=256M
 -XX:MaxDirectMemorySize=256M
 -XX:+UseStringDeduplication
 -XX:+AlwaysActAsServerClassMachine{code}
 
 Kafka Connect config:
{code:java}
"config.providers": "directory"
 "config.providers.directory.class": 
"org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
 
 Kubernetes pod resources limits:
{code:java}
resources:
  requests:
cpu: 1500m
memory: 2Gi
  limits:
cpu: 3000m
memory: 3Gi
{code}
 


> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  
> doker image used: confluentinc/cp-kafka-connect:6.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Utkin updated KAFKA-12818:
-
Description: 
Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
config provider. We've got an OOM in kubernetes environment. K8s kills 
container when limit reached. At same time we've not get any OOM in Java. Heap 
dump did't show us anything interesting.

JVM config:
{code:java}
 -XX:+HeapDumpOnOutOfMemoryError
 -XX:HeapDumpPath=/tmp/
 -XX:+UseContainerSupport
 -XX:+OptimizeStringConcat
 -XX:MaxRAMPercentage=75.0
 -XX:InitialRAMPercentage=50.0
 -XX:MaxMetaspaceSize=256M
 -XX:MaxDirectMemorySize=256M
 -XX:+UseStringDeduplication
 -XX:+AlwaysActAsServerClassMachine{code}
 
 Kafka Connect config:
{code:java}
"config.providers": "directory"
 "config.providers.directory.class": 
"org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
 
 Kubernetes pod resources limits:
{code:java}
resources:
  requests:
cpu: 1500m
memory: 2Gi
  limits:
cpu: 3000m
memory: 3Gi
{code}
 

  was:
Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
config provider. We've got an OOM in kubernetes environment. At same time we've 
not get any OOM in Java. Heap dump did't show us anything interesting.

 


> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. K8s kills 
> container when limit reached. At same time we've not get any OOM in Java. 
> Heap dump did't show us anything interesting.
> JVM config:
> {code:java}
>  -XX:+HeapDumpOnOutOfMemoryError
>  -XX:HeapDumpPath=/tmp/
>  -XX:+UseContainerSupport
>  -XX:+OptimizeStringConcat
>  -XX:MaxRAMPercentage=75.0
>  -XX:InitialRAMPercentage=50.0
>  -XX:MaxMetaspaceSize=256M
>  -XX:MaxDirectMemorySize=256M
>  -XX:+UseStringDeduplication
>  -XX:+AlwaysActAsServerClassMachine{code}
>  
>  Kafka Connect config:
> {code:java}
> "config.providers": "directory"
>  "config.providers.directory.class": 
> "org.apache.kafka.common.config.provider.DirectoryConfigProvider"{code}
>  
>  Kubernetes pod resources limits:
> {code:java}
> resources:
>   requests:
> cpu: 1500m
> memory: 2Gi
>   limits:
> cpu: 3000m
> memory: 3Gi
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)
Viktor Utkin created KAFKA-12818:


 Summary: Memory leakage when kafka connect 2.7 uses directory 
config provider
 Key: KAFKA-12818
 URL: https://issues.apache.org/jira/browse/KAFKA-12818
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.7.0
 Environment: Azure AKS / Kubernetes v1.20
Reporter: Viktor Utkin
 Attachments: Screenshot 2021-05-20 at 14.53.05.png

Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
config provider. We've got an OOM in kubernetes environment. At same time we've 
not get any OOM in Java. Heap dump did't show us anything interesting.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348316#comment-17348316
 ] 

Viktor Utkin commented on KAFKA-12818:
--

https://github.com/apache/kafka/pull/9136/files

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. At same time 
> we've not get any OOM in Java. Heap dump did't show us anything interesting.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12818) Memory leakage when kafka connect 2.7 uses directory config provider

2021-05-20 Thread Viktor Utkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Utkin updated KAFKA-12818:
-
Attachment: Screenshot 2021-05-20 at 14.53.05.png

> Memory leakage when kafka connect 2.7 uses directory config provider
> 
>
> Key: KAFKA-12818
> URL: https://issues.apache.org/jira/browse/KAFKA-12818
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.0
> Environment: Azure AKS / Kubernetes v1.20
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: Screenshot 2021-05-20 at 14.53.05.png
>
>
> Hi, we noticed a Memory leakage problem when kafka connect 2.7 uses directory 
> config provider. We've got an OOM in kubernetes environment. At same time 
> we've not get any OOM in Java. Heap dump did't show us anything interesting.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >