[GitHub] [kafka] michalxo commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-23 Thread GitBox


michalxo commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1076012559


   @showuon (or `Yang Yu`) found the issue (is it you @showuon ?) 
   https://issues.apache.org/jira/browse/KAFKA-13761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511079#comment-17511079
 ] 

Luke Chen commented on KAFKA-13761:
---

Nice find!

> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11691: KAFKA-13598: enable idempotence producer by default and validate the configs

2022-03-23 Thread GitBox


showuon commented on pull request #11691:
URL: https://github.com/apache/kafka/pull/11691#issuecomment-1076036468


   No, I'm not Yang Yu :)
   Glad we found out the root cause!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11936: MINOR: Fix an incompatible bug in GetOffsetShell

2022-03-23 Thread GitBox


dajac commented on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1076082992


   @dengziming Is this related to the failure we saw here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11900/4/tests?
 I will review the PR shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511136#comment-17511136
 ] 

RivenSun commented on KAFKA-13689:
--

Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}

2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig(By the way, kafka's removed configuration and deprecated 
configuration are equivalent to unknownConfig) , if the user has not retrieved 
these configurations in their own custom plugin module by the time the 
logUnused() method is called, then logUnused() will print these unretrieved 
custom configurations.

WDYT?
Thanks.

 

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig

[GitHub] [kafka] dajac commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


dajac commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833010111



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,7 +202,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when 
reconfigurable added while iteration over list occurring
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()

Review comment:
   Is it worth doing the same for this one in order to avoid a similar 
issue in the future?

##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -535,8 +538,8 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
   try {
 val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-reconfigurables.foreach {
+newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+reconfigurables.asScala.foreach {

Review comment:
   nit: Could we use `forEach` instead of `asScala.foreach` here? I am not 
sure if it works though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#issuecomment-1076165670


   > @LiamClarkeNZ Thanks for the patch! I left a few comments below. Could you 
please better explain the race condition in the description?
   
   Definitely :) Will do so now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511136#comment-17511136
 ] 

RivenSun edited comment on KAFKA-13689 at 3/23/22, 10:01 AM:
-

Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}
2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig , if the user has not retrieved these configurations in 
their own custom plugin module by the time the logUnused() method is called, 
then logUnused() will print these unretrieved custom configurations.
Configurations that kafka has removed and deprecated configurations (with 
recommended configurations configured at the same time) can be viewed similar 
to unknownConfig.

WDYT?
Thanks.

 


was (Author: rivensun):
Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}

2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig(By the way, kafka's removed configuration and deprecated 
configuration are equivalent to unknownConfig) , if the user has not retrieved 
these configurations in their own custom plugin module by the time the 
logUnused() method is called, then logUnused() will print these unretrieved 
custom configurations.

WDYT?
Thanks.

 

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = n

[jira] [Comment Edited] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511136#comment-17511136
 ] 

RivenSun edited comment on KAFKA-13689 at 3/23/22, 10:02 AM:
-

Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}
2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig , if the user has not retrieved these configurations in 
their own custom plugin module by the time the logUnused() method is called, 
then logUnused() will print these unretrieved custom configurations.
By the way, configurations that kafka has removed and deprecated configurations 
(with recommended configurations configured at the same time) can be viewed 
similar to unknownConfig.

WDYT?
Thanks.

 


was (Author: rivensun):
Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}
2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig , if the user has not retrieved these configurations in 
their own custom plugin module by the time the logUnused() method is called, 
then logUnused() will print these unretrieved custom configurations.
Configurations that kafka has removed and deprecated configurations (with 
recommended configurations configured at the same time) can be viewed similar 
to unknownConfig.

WDYT?
Thanks.

 

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststo

[jira] [Comment Edited] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511136#comment-17511136
 ] 

RivenSun edited comment on KAFKA-13689 at 3/23/22, 10:04 AM:
-

Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}
2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig , if the user has not retrieved these configurations in 
their own custom plugin module by the time the logUnused() method is called, 
then logUnused() will print these unretrieved custom configurations.
By the way, configurations that kafka has removed and deprecated configurations 
(with recommended configurations configured at the same time) can be viewed 
similar to unknownConfig. Maybe users will use these two types of configuration 
in their custom plugins.

WDYT?
Thanks.

 


was (Author: rivensun):
Hi [~guozhang]  I thought about it carefully, maybe we are thinking too 
complicated, or we have been looking at the logUnused() method from the 
perspective of Kafka.

{*}The logUnused() method is user-oriented{*}, in order to tell the user all 
the configurations he passed in, {*}whether these configurations are known or 
unknown{*}, which configurations are used and which are unused.
In short, the logUnused method should compare the difference between 
`originals` and `used`.

So my few points are:

1. Keep the existing logic of the unused() method, just modify the log output 
information of logUnused(). As you mentioned, printing "isn't a known config" 
in the logUnused() method is both incorrect and weird.
{code:java}
/**
 * Log warnings for any unused configurations
 */
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used.", 
unusedkeys);
}
} {code}
2. For unenable knownConfig, Kafka do not need to actively `ignore` it. 
As in the example in this JIRA, in the logUnused() method, the 
`enable.idempotence` configuration is not printed, because it really has been 
retrieved by Kafka.
But the `transaction.timeout.ms` configuration is printed. Because the user has 
passed in this configuration, but Kafka has not retrieved this configuration 
when calling the logUnused() method, it should be printed out.

3. For unKnownConfig , if the user has not retrieved these configurations in 
their own custom plugin module by the time the logUnused() method is called, 
then logUnused() will print these unretrieved custom configurations.
By the way, configurations that kafka has removed and deprecated configurations 
(with recommended configurations configured at the same time) can be viewed 
similar to unknownConfig.

WDYT?
Thanks.

 

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\document

[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833080985



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,7 +202,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when 
reconfigurable added while iteration over list occurring
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()

Review comment:
   That's a good question. I am happy to do so, I don't have much 
experience in this area of code, I just wanted to help eliminate some flaky 
tests. I can't see any downsides to doing so. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833080985



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,7 +202,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when 
reconfigurable added while iteration over list occurring
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()

Review comment:
   That's a good question. I am happy to do so, I don't have much 
experience in this area of code, I just wanted to help eliminate some flaky 
tests, but I can't see any downsides to doing so. 
   
   I chose to use a `CopyOnWriteArrayList` because iteration over the 
reconfigurables is a lot more common than adding items, so the overhead is 
cheap. 
   
   I also looked at a `ConcurrentQueue`, as that also avoids the concurrent 
modification exception, and is cheaper to add items to.
   
   I chose the COWArrayList because iteration over a ConcurrentQueue one is 
non-deterministic - i.e., a freshly added Reconfigurable could appear in an 
iteration that's currently occurring, and I'm not sure if that's safe or not. 
   
   Whereas the CopyOnWriteArrayList offers deterministic iteration. 
   
   So, if people are happy for me to extend the change to the 
brokerReconfigurables also defensively, I'd be pleased to do it. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833080985



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,7 +202,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when 
reconfigurable added while iteration over list occurring
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()

Review comment:
   That's a good question. I am happy to do so, I don't have much 
experience in this area of code, I just wanted to help eliminate some flaky 
tests, but I can't see any downsides to doing so. 
   
   I chose to use a `CopyOnWriteArrayList` because iteration over the 
reconfigurables is a lot more common than adding items, so the overhead is 
cheap. 
   
   I also looked at a `ConcurrentQueue`, as that also avoids the concurrent 
modification exception, and is cheaper to add items to.
   
   I chose the COWArrayList because iteration over a ConcurrentQueue one is 
non-deterministic - i.e., a freshly added Reconfigurable could appear in an 
iteration that's currently occurring, and I could envisage a bunch of subtle 
ways that could maybe cause issues.
   
   Whereas the CopyOnWriteArrayList offers deterministic iteration, which feels 
safer.
   
   So, if people are happy for me to defensively extend the change to the 
brokerReconfigurables also, I'd be pleased to do so :) 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833080985



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,7 +202,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when 
reconfigurable added while iteration over list occurring
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()

Review comment:
   That's a good question. I am happy to do so, I don't have much 
experience in this area of code, I just wanted to help eliminate some flaky 
tests, but I can't see any downsides to doing so. 
   
   I chose to use a `CopyOnWriteArrayList` because iteration over the 
reconfigurables is a lot more common than adding items, so the overhead is 
cheap. 
   
   I also looked at a `ConcurrentQueue`, as that also avoids the concurrent 
modification exception, and is cheaper to add items to.
   
   I chose the COWArrayList because iteration over a ConcurrentQueue one is 
non-deterministic - i.e., a freshly added Reconfigurable could appear in an 
iteration that's currently occurring, and I could envisage a bunch of subtle 
ways that could maybe cause issues.
   
   Whereas the CopyOnWriteArrayList offers deterministic iteration, which feels 
safer. But there's some additional cost on adding items to the 
brokerReconfigurables.
   
   If that cost is acceptable, and people are happy for me to defensively 
extend the change to the brokerReconfigurables also, I'd be pleased to do so :) 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833089404



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -535,8 +538,8 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
   try {
 val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-reconfigurables.foreach {
+newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+reconfigurables.asScala.foreach {

Review comment:
   @dajac so I tried it, it works nicely with only couple more brackets, 
well done Scala on handling Java functions so nicely. I'll push the change now, 
please let me know what you think :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833094619



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,7 +202,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when 
reconfigurable added while iteration over list occurring
+  private val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
   private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()

Review comment:
   @dajac I've pushed changes in this regard also, all unit tests and 
integration tests seem happy locally, will see what the CI brings :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #11937: MINOR: A few cleanups in BrokerToControllerChannelManager

2022-03-23 Thread GitBox


dajac opened a new pull request #11937:
URL: https://github.com/apache/kafka/pull/11937


   I made a few edits to make the code style more consistent while reading that 
file.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11920: KAFKA-13672 Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


dajac commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833118468



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -35,6 +35,7 @@ import org.apache.kafka.common.network.{ListenerName, 
ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 
+import java.util.concurrent.CopyOnWriteArrayList

Review comment:
   nit: Could we either move this one next to the other `java.util.*` 
imports or move the others down here?

##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
   try {
 val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-reconfigurables.foreach {
+newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+reconfigurables.forEach( {

Review comment:
   nit: You can remove the parenthesis. `forEach { ...` should work.

##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
   try {
 val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-reconfigurables.foreach {
+newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+reconfigurables.forEach( {
   case listenerReconfigurable: ListenerReconfigurable =>
 processListenerReconfigurable(listenerReconfigurable, newConfig, 
customConfigs, validateOnly, reloadOnly = false)
   case reconfigurable =>
 if (needsReconfiguration(reconfigurable.reconfigurableConfigs, 
changeMap.keySet, deletedKeySet))
   processReconfigurable(reconfigurable, changeMap.keySet, 
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
-}
+})
 
 // BrokerReconfigurable updates are processed after config is updated. 
Only do the validation here.
 val brokerReconfigurablesToUpdate = 
mutable.Buffer[BrokerReconfigurable]()
-brokerReconfigurables.foreach { reconfigurable =>
+brokerReconfigurables.forEach({ reconfigurable =>

Review comment:
   nit: ditto.

##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
-  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
+  // collections, while another thread is iterating over them

Review comment:
   nit: Could we add `.` at the end of the sentence?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11760: KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist

2022-03-23 Thread GitBox


cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r833117940



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -102,45 +108,27 @@ static int assignActiveTaskMovements(final Map> tasksToC
 final int movementsNeeded = taskMovements.size();
 
 for (final TaskMovement movement : taskMovements) {
-final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
-movement.task,
-c -> clientStates.get(c).hasStandbyTask(movement.task)
-);
-if (standbySourceClient == null) {
-// there's not a caught-up standby available to take over the 
task, so we'll schedule a warmup instead
-final UUID sourceClient = requireNonNull(
-caughtUpClientsByTaskLoad.poll(movement.task),
-"Tried to move task to caught-up client but none exist"
-);
-
-moveActiveAndTryToWarmUp(
-remainingWarmupReplicas,
-movement.task,
-clientStates.get(sourceClient),
-clientStates.get(movement.destination),
-warmups.computeIfAbsent(movement.destination, x -> new 
TreeSet<>())
-);
-caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, 
movement.destination));
-} else {
-// we found a candidate to trade standby/active state with our 
destination, so we don't need a warmup
-swapStandbyAndActive(
-movement.task,
-clientStates.get(standbySourceClient),
-clientStates.get(movement.destination)
-);
-caughtUpClientsByTaskLoad.offerAll(asList(standbySourceClient, 
movement.destination));
+// Attempt to find a caught up standby, otherwise find any caught 
up client, failing that use the most
+// caught up client.
+final boolean moved = 
tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, 
caughtUpClientsByTaskLoad, movement) ||
+
tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, 
remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
+tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, 
clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, 
movement);
+
+if (!moved) {
+throw new IllegalStateException("Tried to move task to more 
caught-up client but none exist");

Review comment:
   ```suggestion
   throw new IllegalStateException("Tried to move task to more 
caught-up client as scheduled before but none exist");
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -102,45 +108,27 @@ static int assignActiveTaskMovements(final Map> tasksToC
 final int movementsNeeded = taskMovements.size();
 
 for (final TaskMovement movement : taskMovements) {
-final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
-movement.task,
-c -> clientStates.get(c).hasStandbyTask(movement.task)
-);
-if (standbySourceClient == null) {
-// there's not a caught-up standby available to take over the 
task, so we'll schedule a warmup instead
-final UUID sourceClient = requireNonNull(
-caughtUpClientsByTaskLoad.poll(movement.task),
-"Tried to move task to caught-up client but none exist"
-);
-
-moveActiveAndTryToWarmUp(
-remainingWarmupReplicas,
-movement.task,
-clientStates.get(sourceClient),
-clientStates.get(movement.destination),
-warmups.computeIfAbsent(movement.destination, x -> new 
TreeSet<>())
-);
-caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, 
movement.destination));
-} else {
-// we found a candidate to trade standby/active state with our 
destination, so we don't need a warmup
-swapStandbyAndActive(
-movement.task,
-clientStates.get(standbySourceClient),
-clientStates.get(movement.destination)
-);
-caughtUpClientsByTaskLoad.offerAll(asList(standbySourceClient, 
movement.destination));
+// Attempt to find a caught up standby, otherwise find any caught 
up client, failing that use the most
+// caught up client.
+final boolean moved = 
tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, 
caughtUpClientsByTaskLoad, move

[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833158112



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -201,8 +202,11 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private[server] val staticDefaultConfigs = 
ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
   private val dynamicBrokerConfigs = mutable.Map[String, String]()
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
-  private val reconfigurables = mutable.Buffer[Reconfigurable]()
-  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
+
+  // Use COWArrayList to prevent concurrent modification exception when an 
item is added by one thread to these
+  // collections, while another thread is iterating over them

Review comment:
   Done. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833158352



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -35,6 +35,7 @@ import org.apache.kafka.common.network.{ListenerName, 
ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
 
+import java.util.concurrent.CopyOnWriteArrayList

Review comment:
   I moved it up to the others. hope that looks okay.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


LiamClarkeNZ commented on a change in pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#discussion_r833158538



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -535,24 +539,24 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
   try {
 val customConfigs = new util.HashMap[String, 
Object](newConfig.originalsFromThisConfig) // non-Kafka configs
-newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
-reconfigurables.foreach {
+newConfig.valuesFromThisConfig.keySet.forEach(k => 
customConfigs.remove(k))
+reconfigurables.forEach( {

Review comment:
   Cheers :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12399) Deprecate Log4J Appender

2022-03-23 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-12399:
--
Fix Version/s: 3.3.0
   (was: 3.2.0)

> Deprecate Log4J Appender
> 
>
> Key: KAFKA-12399
> URL: https://issues.apache.org/jira/browse/KAFKA-12399
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> As a following job of KAFKA-9366, we have to entirely remove the log4j 1.2.7 
> dependency from the classpath by removing dependencies on log4j-appender.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-9366) Upgrade log4j to log4j2

2022-03-23 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-9366:
-
Fix Version/s: 3.3.0
   (was: 3.2.0)

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13762) Kafka brokers are not coming up

2022-03-23 Thread Kamesh (Jira)
Kamesh created KAFKA-13762:
--

 Summary: Kafka brokers are not coming up 
 Key: KAFKA-13762
 URL: https://issues.apache.org/jira/browse/KAFKA-13762
 Project: Kafka
  Issue Type: Bug
Reporter: Kamesh


We are getting below error 

 

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
sun.instrument.InstrumentationImpl.loadClassAndStartAgent(InstrumentationImpl.java:386)
        at 
sun.instrument.InstrumentationImpl.loadClassAndCallPremain(InstrumentationImpl.java:401)
Caused by: java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:433)
        at sun.nio.ch.Net.bind(Net.java:425)
        at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at sun.net.httpserver.ServerImpl.(ServerImpl.java:100)
        at sun.net.httpserver.HttpServerImpl.(HttpServerImpl.java:50)
        at 
sun.net.httpserver.DefaultHttpServerProvider.createHttpServer(DefaultHttpServerProvider.java:35)
        at com.sun.net.httpserver.HttpServer.create(HttpServer.java:130)
        at 
io.prometheus.jmx.shaded.io.prometheus.client.exporter.HTTPServer.(HTTPServer.java:179)
        at 
io.prometheus.jmx.shaded.io.prometheus.jmx.JavaAgent.premain(JavaAgent.java:31)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13762) Kafka brokers are not coming up

2022-03-23 Thread Kamesh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamesh updated KAFKA-13762:
---
Description: 
Out of 9 brokers only 3 brokers coming up. Totally 3 VMs Each VM is having 3 
brokers

We are getting below error 

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
sun.instrument.InstrumentationImpl.loadClassAndStartAgent(InstrumentationImpl.java:386)
        at 
sun.instrument.InstrumentationImpl.loadClassAndCallPremain(InstrumentationImpl.java:401)
Caused by: java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:433)
        at sun.nio.ch.Net.bind(Net.java:425)
        at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at sun.net.httpserver.ServerImpl.(ServerImpl.java:100)
        at sun.net.httpserver.HttpServerImpl.(HttpServerImpl.java:50)
        at 
sun.net.httpserver.DefaultHttpServerProvider.createHttpServer(DefaultHttpServerProvider.java:35)
        at com.sun.net.httpserver.HttpServer.create(HttpServer.java:130)
        at 
io.prometheus.jmx.shaded.io.prometheus.client.exporter.HTTPServer.(HTTPServer.java:179)
        at 
io.prometheus.jmx.shaded.io.prometheus.jmx.JavaAgent.premain(JavaAgent.java:31)

  was:
We are getting below error 

 

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
sun.instrument.InstrumentationImpl.loadClassAndStartAgent(InstrumentationImpl.java:386)
        at 
sun.instrument.InstrumentationImpl.loadClassAndCallPremain(InstrumentationImpl.java:401)
Caused by: java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:433)
        at sun.nio.ch.Net.bind(Net.java:425)
        at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at sun.net.httpserver.ServerImpl.(ServerImpl.java:100)
        at sun.net.httpserver.HttpServerImpl.(HttpServerImpl.java:50)
        at 
sun.net.httpserver.DefaultHttpServerProvider.createHttpServer(DefaultHttpServerProvider.java:35)
        at com.sun.net.httpserver.HttpServer.create(HttpServer.java:130)
        at 
io.prometheus.jmx.shaded.io.prometheus.client.exporter.HTTPServer.(HTTPServer.java:179)
        at 
io.prometheus.jmx.shaded.io.prometheus.jmx.JavaAgent.premain(JavaAgent.java:31)


> Kafka brokers are not coming up 
> 
>
> Key: KAFKA-13762
> URL: https://issues.apache.org/jira/browse/KAFKA-13762
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kamesh
>Priority: Blocker
>
> Out of 9 brokers only 3 brokers coming up. Totally 3 VMs Each VM is having 3 
> brokers
> We are getting below error 
> Exception in thread "main" java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> sun.instrument.InstrumentationImpl.loadClassAndStartAgent(InstrumentationImpl.java:386)
>         at 
> sun.instrument.InstrumentationImpl.loadClassAndCallPremain(InstrumentationImpl.java:401)
> Caused by: java.net.BindException: Address already in use
>         at sun.nio.ch.Net.bind0(Native Method)
>         at sun.nio.ch.Net.bind(Net.java:433)
>         at sun.nio.ch.Net.bind(Net.java:425)
>         at 
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
>         at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
>         at sun.net.httpserver.ServerImpl.(ServerImpl.java:100)
>         at sun.net.httpserver.HttpServerImpl.(HttpServerImpl.java:50)
>         at 
> sun.net.httpserver.DefaultHttpServerProvider.createHttpServer(DefaultHttpServerProvider.java:35)
>         at com.sun.net.httpserver.HttpServer.create(HttpServer.java:130)

[jira] [Commented] (KAFKA-8575) Investigate removing EAGER protocol & cleaning up task suspension in Streams rebalancing

2022-03-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511220#comment-17511220
 ] 

Bruno Cadonna commented on KAFKA-8575:
--

[~ableegoldman] Is this still a blocker for AK 3.2? 

> Investigate removing EAGER protocol &  cleaning up task suspension in Streams 
> rebalancing
> -
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.2.0
>
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up and going a step further to remove the EAGER protocol from Streams 
> entirely.
> Plan to remove this in 3.1/4.0, whichever comes after 3.0. This will make 3.0 
> a bridge release for users upgrading from any version below 2.4, but they 
> will still be able to do so in the usual two rolling bounces.
>  
> *The upgrade path from 2.3 and below, to any \{to_version} higher than 3.1 
> will be:*
> 1. During the first rolling bounce, upgrade the jars to a version between 2.4 
> - 3.1 and add the UPGRADE_FROM config for whichever version you are upgrading 
> from
> 2. During the second rolling bounce, upgrade the jars to the desired 
> \{to_version} and remove the UPGRADE_FROM config
>  
> EAGER will be effectively deprecated in 3.0 but not removed until the next 
> version.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-03-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511223#comment-17511223
 ] 

Bruno Cadonna commented on KAFKA-13542:
---

[~lihaosky] and [~ableegoldman] do you plan to fix this for the 3.2 release?

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] cadonna commented on pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


cadonna commented on pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#issuecomment-1076319050


   @dajac @showuon: Once this PR is merged, could one of you cherry-pick it to 
the 3.2 branch? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511236#comment-17511236
 ] 

Ismael Juma commented on KAFKA-13761:
-

I think we probably want to disable idempotence for the log4j appender.

> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511246#comment-17511246
 ] 

Luke Chen commented on KAFKA-13761:
---

[~ijuma] , I agree. And we should further throw exception to note users that 
log4j appender doesn't support idempotent producer before this bug is fixed. 
After all, log4j appender is depredated in KIP-719, and should be removed in 
the future. WDYT?

> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11923: KAFKA-6718 / Documentation

2022-03-23 Thread GitBox


showuon commented on pull request #11923:
URL: https://github.com/apache/kafka/pull/11923#issuecomment-1076362860


   cc @cadonna , do you want to have another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11936: MINOR: Fix an incompatible bug in GetOffsetShell

2022-03-23 Thread GitBox


dengziming commented on pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#issuecomment-1076366161


   > Is this related to the failure we saw here
   
   Sadly not, the [failure]( 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11900/4/tests)
 can no longer be reproduced both locally and in any PRs, so we can't get any 
logs from it. 😕
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11937: MINOR: A few cleanups in BrokerToControllerChannelManager

2022-03-23 Thread GitBox


dengziming commented on pull request #11937:
URL: https://github.com/apache/kafka/pull/11937#issuecomment-1076372788


   Maybe we can use the file in checkstyle/checkstyle.xml to force scala code 
style, but this is a big change and should be done with the approval of at 
least 3 PMC members. 🤔


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-03-23 Thread Brandon Kimbrough (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511262#comment-17511262
 ] 

Brandon Kimbrough commented on KAFKA-9366:
--

[~cadonna] did I really just see that you kicked this out of the 3.3.0 
release?! We've all been waiting months for you all to patch the Log4J CVEs, 
how are you going to just boot it to some undetermined time in the future?! I 
think the world need this for at least 2 months ago!

 

CC [~dongjin] 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-03-23 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1076403226


   Rebased onto the latest trunk. cc/ @edoardocomar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511283#comment-17511283
 ] 

Dongjin Lee commented on KAFKA-13761:
-

Hi [~yyu1993] [~showuon],

It seems like this issue is a counterpart of LOG4J2-3256, which disables 
logging from {{org.apache.kafka.common}} and {{org.apache.kafka.clients}} 
packages. How about add a similar logic to log4j-appender?

> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei opened a new pull request #11938: MINOR: Clarify how to publish specific projects to the local repo

2022-03-23 Thread GitBox


vvcephei opened a new pull request #11938:
URL: https://github.com/apache/kafka/pull/11938


   The current README instruction for local publishing boils the ocean by 
building and installing every jar in the project with both 2.12 and 2.13. While 
that is some times what people want to do, they are also often trying to just 
build a specific jar.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511372#comment-17511372
 ] 

Ismael Juma commented on KAFKA-13761:
-

I suggest we do the simplest change first (disable idempotence) and then follow 
up with more complicated change.

> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] yyu1993 opened a new pull request #11939: KAFKA-13761: KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread GitBox


yyu1993 opened a new pull request #11939:
URL: https://github.com/apache/kafka/pull/11939


   When a log entry is appended to a Kafka topic using KafkaLog4jAppender, the 
producer.send operation may hit a deadlock if the producer network thread also 
tries to append a log at the same log level. This issue is triggered when 
idempotence is enabled for the KafkaLog4jAppender and the producer tries to 
acquire the TransactionManager lock.
   
   This is a temporary workaround to avoid deadlocks by disabling idempotence 
explicitly in KafkaLog4jAppender.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11933: KAFKA-13759: Disable idempotence by default in producers instantiated by Connect

2022-03-23 Thread GitBox


kkonstantine commented on pull request #11933:
URL: https://github.com/apache/kafka/pull/11933#issuecomment-1076612664


   Tests passed on the latest run with a couple of failures that don't seem 
relevant. Yet, because one failure was on `SourceConnectorsIntegrationTest` I'm 
rerunning these tests. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #11938: MINOR: Clarify how to publish specific projects to the local repo

2022-03-23 Thread GitBox


vvcephei merged pull request #11938:
URL: https://github.com/apache/kafka/pull/11938


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13758) Exclusive locking in kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227

2022-03-23 Thread Sree Vaddi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511447#comment-17511447
 ] 

Sree Vaddi commented on KAFKA-13758:


Work Around:
- restart the broker node. ({*}tried and worked{*})

- clear the lock file of offset / partition / consumer / consumer group in zk 
data.folder.

> Exclusive locking in 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227
> -
>
> Key: KAFKA-13758
> URL: https://issues.apache.org/jira/browse/KAFKA-13758
> Project: Kafka
>  Issue Type: Bug
> Environment: Azure
>Reporter: Sree Vaddi
>Priority: Major
>
> {quote} [2022-03-22 15:15:03,127] *ERROR* [GroupMetadataManager brokerId=#] 
> Appending metadata message for group ... failed due to unexpected *error:* 
> _org.apache.kafka.common.errors.UnknownServerException_ 
> (kafka.coordinator.group.GroupMetadataManager){quote}
> Probably Issues is here: CoreUtils.scala inLock()
>   /**
>    * Execute the given function inside the lock
>    */
>   def inLock[T](lock: Lock)(fun: => T): T = {
>     lock.lock()
>     try {
>       fun
>     } finally {
>       lock.unlock()
>     }
>   }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rhauch commented on a change in pull request #11933: KAFKA-13759: Disable idempotence by default in producers instantiated by Connect

2022-03-23 Thread GitBox


rhauch commented on a change in pull request #11933:
URL: https://github.com/apache/kafka/pull/11933#discussion_r833637529



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -648,6 +648,12 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 // These settings will execute infinite retries on retriable 
exceptions. They *may* be overridden via configs passed to the worker,
 // but this may compromise the delivery guarantees of Kafka Connect.
 producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 
Long.toString(Long.MAX_VALUE));
+// By default, producers that are instantiated and used by Connect 
have idempotence disabled even after idempotence became
+// default for Kafka producers. This ensures Connect continues to work 
with many Kafka broker versions, including older brokers that do not support

Review comment:
   Minor nit, but I think it's worth making it super clear that _Connect_ 
is disabling the idempotent behavior of all it's producers, versus somehow the 
_producers_ doing something to disable idempotent behavior.
   ```suggestion
   // By default, Connect disables idempotent behavior for all 
producers, even though idempotence became
   // default for Kafka producers. This is to ensure Connect continues 
to work with many Kafka broker versions, including older brokers that do not 
support
   ```

##
File path: docs/upgrade.html
##
@@ -25,6 +25,10 @@ Notable changes in 3
 which meant that idempotence remained disabled unless the user had 
explicitly set enable.idempotence to true
 (See https://issues.apache.org/jira/browse/KAFKA-13598";>KAFKA-13598for 
more details).
 This issue was fixed and the default is properly applied in 3.0.1, 
3.1.1, and 3.2.0.
+A notable exception is Connect that by default currently disables 
idempotence for its
+producers in order to support connecting to older versions of the 
Kafka broker without
+explicit changes. Enabling idempotence is still possible via 
configuration and this
+default might change in a major version upgrade in the future.

Review comment:
   I also think it's worth clarifying the same thing here. Maybe:
   ```suggestion
   A notable exception is Connect that by default disables 
idempotent behavior for all of its
   producers in order to uniformly support using a wide range of 
Kafka broker versions. 
   Users can change this behavior to enable idempotence for some or 
all producers
   via Connect worker and/or connector configuration. Connect may 
enable idempotent producers
   by default in a future major release.
   ```
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13761) KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread Yang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511452#comment-17511452
 ] 

Yang Yu commented on KAFKA-13761:
-

Hi [~ijuma] , I have a PR for disabling idemptence, could you review? 
https://github.com/apache/kafka/pull/11939

> KafkaLog4jAppender deadlocks when idempotence is enabled
> 
>
> Key: KAFKA-13761
> URL: https://issues.apache.org/jira/browse/KAFKA-13761
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Yang Yu
>Priority: Major
>
> KafkaLog4jAppender instantiates a KafkaProducer to append log entries to a 
> Kafka topic. The producer.send operation may need to acquire locks during its 
> execution. This can result in deadlocks when a log entry from the producer 
> network thread is also at a log level that results in the entry being 
> appended to a Kafka topic (KAFKA-6415).
> [https://github.com/apache/kafka/pull/11691] enables idempotence by default, 
> and it introduced another place where the producer network thread can hit a 
> deadlock. When calling TransactionManger#maybeAddPartition, the producer 
> network thread will wait on the TransactionManager lock, and a deadlock can 
> happen if TransactionManager also logs at INFO level. This is causing system 
> test failures in log4j_appender_test.py
> Similar to KAFKA-6415, a workaround will be setting log level to WARN for 
> TransactionManager in VerifiableLog4jAppender.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ijuma commented on a change in pull request #11939: KAFKA-13761: KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread GitBox


ijuma commented on a change in pull request #11939:
URL: https://github.com/apache/kafka/pull/11939#discussion_r833658737



##
File path: 
log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
##
@@ -290,6 +291,7 @@ public void activateOptions() {
 props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
 props.put(LINGER_MS_CONFIG, lingerMs);
 props.put(BATCH_SIZE_CONFIG, batchSize);
+props.put(ENABLE_IDEMPOTENCE_CONFIG, false);

Review comment:
   @yyu1993 Can you please add a comment explaining this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yyu1993 commented on a change in pull request #11939: KAFKA-13761: KafkaLog4jAppender deadlocks when idempotence is enabled

2022-03-23 Thread GitBox


yyu1993 commented on a change in pull request #11939:
URL: https://github.com/apache/kafka/pull/11939#discussion_r833676286



##
File path: 
log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
##
@@ -290,6 +291,7 @@ public void activateOptions() {
 props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
 props.put(LINGER_MS_CONFIG, lingerMs);
 props.put(BATCH_SIZE_CONFIG, batchSize);
+props.put(ENABLE_IDEMPOTENCE_CONFIG, false);

Review comment:
   I have added the comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2022-03-23 Thread GitBox


C0urante commented on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-107650


   Thanks @showuon. In that case, I can file separate issues for a lot of the 
comments I've made here, and we can try to keep this PR as focused as possible 
for the sake of moving forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11933: KAFKA-13759: Disable idempotence by default in producers instantiated by Connect

2022-03-23 Thread GitBox


kkonstantine commented on pull request #11933:
URL: https://github.com/apache/kafka/pull/11933#issuecomment-1076781304


   Thanks @rhauch. I've incorporated your suggestions in all places. The PR is 
ready for another pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks

2022-03-23 Thread GitBox


C0urante commented on pull request #11869:
URL: https://github.com/apache/kafka/pull/11869#issuecomment-1076799542


   @showuon @mimaison would either of you have time for this? Might be worth 
trying to include in the 3.2.0 release if we're still considering 
non-regressions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13404) Kafka sink connectors do not commit offset correctly if messages are produced in transaction

2022-03-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511487#comment-17511487
 ] 

Chris Egerton commented on KAFKA-13404:
---

Thanks [~yujhe.li]. Agreed that an ideal fix for this would prevent control 
records from distorting committed offsets.

One complication is that the connector API allows sink connectors to explicitly 
specify which offsets get committed for each topic partition via the 
{{SinkTask}} class's 
[preCommit|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit(java.util.Map)]
 method. These offsets are usually derived by querying the 
[kafkaOffset|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#kafkaOffset()]
 method for each {{SinkRecord}} that the task receives in 
[put|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#put(java.util.Collection)].
 This creates an issue because the contract for the {{kafkaOffset}} method is 
that it returns the offset for the consumer record that the {{SinkRecord}} was 
derived from–not the offset that should be committed to Kafka in order to 
signal that that record has been successfully processed by the connector and 
should not be redelivered to it in the future. Examples of this can be found in 
the Confluent HDFS connector 
[here|https://github.com/confluentinc/kafka-connect-hdfs/pull/425] and the 
WePay/Confluent BigQuery connector 
[here|https://github.com/confluentinc/kafka-connect-bigquery/blob/e7c19571ff94f3aa290a02d490b0049af2c51e0c/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java#L331-L334].

Unfortunately, we can't really modify the {{kafkaOffset}} method without 
running into some pretty ugly compatibility issues.

One thing that might be worth noting is KAFKA-13431, a work-in-progress effort 
to address a separate issue in sink task offset tracking logic.

Given that issue, here are two possible approaches we could take, which are not 
mutually exclusive:
 # Adjust the behavior of the Kafka Connect framework to fix this issue but 
only for sink connectors that do not manually manage their own offsets 
(probably using a similar strategy to what Kafka Streams does, but I haven't 
taken a long look at that yet)
 # Account for this bug while working on KAFKA-13431. One possible method could 
be to add an {{acknowledge}} or {{commit}} method to the {{SinkTask}} class, so 
that tasks can notify Kafka Connect that a record has been processed 
successfully without having to explicitly manage offsets. This would be a 
pretty major change to the connector API, but given that it would allow us to 
address this and another long-standing bug, and could also potentially be used 
to satisfy the use case for 
[KIP-767|https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics],
 there might be enough here to warrant it.

> Kafka sink connectors do not commit offset correctly if messages are produced 
> in transaction
> 
>
> Key: KAFKA-13404
> URL: https://issues.apache.org/jira/browse/KAFKA-13404
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.1
>Reporter: Yu-Jhe Li
>Priority: Major
> Attachments: Main.scala
>
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if 
> the messages are produced in a transaction.
> From the code of 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/2.6.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L477]],
>  we found that the sink connector gets offset from messages and commits it to 
> Kafka after the messages are processed successfully. But for messages 
> produced in the transaction, there are additional record [control 
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to 
> indicate the transaction is successful or aborted.
>  
> You can reproduce it by running `connect-file-sink` with the following 
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties 
> /connect-file-sink.properties{noformat}
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=1
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=none
> {code}
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topi

[GitHub] [kafka] vvcephei commented on a change in pull request #11926: KAFKA-13714: Fix cache flush position

2022-03-23 Thread GitBox


vvcephei commented on a change in pull request #11926:
URL: https://github.com/apache/kafka/pull/11926#discussion_r833701992



##
File path: build.gradle
##
@@ -207,7 +207,7 @@ if (file('.git').exists()) {
 } else {
   rat.enabled = false
 }
-println("Starting build with version $version (commit id ${commitId.take(8)}) 
using Gradle $gradleVersion, Java ${JavaVersion.current()} and Scala 
${versions.scala}")
+println("Starting build with version $version (commit id ${commitId == null ? 
"null" : commitId.take(8)}) using Gradle $gradleVersion, Java 
${JavaVersion.current()} and Scala ${versions.scala}")

Review comment:
   For some reason, I was getting an NPE here while running the tests 
locally (once). I could revert this change, but it also seems harmless to keep 
it.

##
File path: build.gradle
##
@@ -468,6 +474,12 @@ subprojects {
   maxRetries = userMaxTestRetries
   maxFailures = userMaxTestRetryFailures
 }
+
+// Allows devs to run tests in a loop to debug flaky tests
+// Eg: I=0; while ./gradlew :streams:test -Prerun-tests --tests 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest --fail-fast; do 
(( I=$I+1 )); echo
+if (project.hasProperty("rerun-tests")) {
+  outputs.upToDateWhen { false }
+}

Review comment:
   Same thing for integration tests.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##
@@ -226,10 +226,9 @@ private void putAndMaybeForward(final 
ThreadCache.DirtyEntry entry,
 if (rawNewValue != null || rawOldValue != null) {
 // we need to get the old values if needed, and then put to 
store, and then flush
 final ProcessorRecordContext current = context.recordContext();
-context.setRecordContext(entry.entry().context());
-wrapped().put(entry.key(), entry.newValue());
-
 try {
+context.setRecordContext(entry.entry().context());
+wrapped().put(entry.key(), entry.newValue());

Review comment:
   we should be doing this inside the try block so that we will for sure 
set the context back no matter what happens.

##
File path: build.gradle
##
@@ -435,6 +435,12 @@ subprojects {
   maxRetries = userMaxTestRetries
   maxFailures = userMaxTestRetryFailures
 }
+
+// Allows devs to run tests in a loop to debug flaky tests
+// Eg: I=0; while ./gradlew :streams:test -Prerun-tests --tests 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest --fail-fast; do 
(( I=$I+1 )); echo
+if (project.hasProperty("rerun-tests")) {
+  outputs.upToDateWhen { false }
+}

Review comment:
   This is how I was able to repro the issue consistently, even though it 
only shows up roughly 1/500th of the time. I'll update the readme also to 
mention this approach instead of cleanTest, which causes us to recompile the 
tests on every iteration.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -142,7 +142,9 @@ public static void updatePosition(
 
 if (stateStoreContext != null && 
stateStoreContext.recordMetadata().isPresent()) {
 final RecordMetadata meta = 
stateStoreContext.recordMetadata().get();
-position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
+if (meta.topic() != null) {

Review comment:
   Fixes some NPEs from the unit tests. I figure it's fine to have this 
here, since there's technically no contract that says topic can't be null and 
we're already guarding against a null context and missing recordMetadata.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -760,45 +760,50 @@ public static void after() {
 
 @Test
 public void verifyStore() {
-if (storeToTest.global()) {
-// See KAFKA-13523
-globalShouldRejectAllQueries();
-} else {
-shouldRejectUnknownQuery();
-shouldCollectExecutionInfo();
-shouldCollectExecutionInfoUnderFailure();
-
-if (storeToTest.keyValue()) {
-if (storeToTest.timestamped()) {
-final Function, Integer> 
valueExtractor =
-ValueAndTimestamp::value;
-shouldHandleKeyQuery(2, valueExtractor, 2);
-shouldHandleRangeQueries(valueExtractor);
-} else {
-final Function valueExtractor = 
Function.identity();
-shouldHandleKeyQuery(2, valueExtractor, 2);
-shouldHandleRangeQueries(valueExtractor);
+try {
+if (storeToTest.global()) {
+// See KAFKA-13523
+globalShouldRejectAllQueries();
+} else {
+sho

[jira] [Comment Edited] (KAFKA-13404) Kafka sink connectors do not commit offset correctly if messages are produced in transaction

2022-03-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511487#comment-17511487
 ] 

Chris Egerton edited comment on KAFKA-13404 at 3/23/22, 8:59 PM:
-

Thanks [~yujhe.li]. Agreed that an ideal fix for this would prevent control 
records from distorting committed offsets.

One complication is that the connector API allows sink connectors to explicitly 
specify which offsets get committed for each topic partition via the 
{{SinkTask}} class's 
[preCommit|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit(java.util.Map)]
 method. These offsets are usually derived by querying the 
[kafkaOffset|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#kafkaOffset()]
 method for each {{SinkRecord}} that the task receives in 
[put|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#put(java.util.Collection)].
 This creates an issue because the contract for the {{kafkaOffset}} method is 
that it returns the offset for the consumer record that the {{SinkRecord}} was 
derived from–not the offset that should be committed to Kafka in order to 
signal that that record has been successfully processed by the connector and 
should not be redelivered to it in the future. Examples of this can be found in 
the Confluent HDFS connector 
[here|https://github.com/confluentinc/kafka-connect-hdfs/pull/425] and the 
WePay/Confluent BigQuery connector 
[here|https://github.com/confluentinc/kafka-connect-bigquery/blob/e7c19571ff94f3aa290a02d490b0049af2c51e0c/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java#L331-L334].

Unfortunately, we can't really modify the {{kafkaOffset}} method without 
running into some pretty ugly compatibility issues.

One thing that might be worth noting is KAFKA-13431, a work-in-progress effort 
to address a separate issue in sink task offset tracking logic.

Given that issue, here are two possible approaches we could take, which are not 
mutually exclusive:
 # Adjust the behavior of the Kafka Connect framework to fix this issue but 
only for sink connectors that do not manually manage their own offsets 
(probably using a similar strategy to what Kafka Streams does, but I haven't 
taken a long look at that yet)
 # Account for this bug while working on KAFKA-13431. One possible method could 
be to add an {{acknowledge}} or {{commit}} method to the {{SinkRecord}} or 
{{SinkTaskContext}} class, so that tasks can notify Kafka Connect that a record 
has been processed successfully without having to explicitly manage offsets. 
This would be a pretty major change to the connector API, but given that it 
would allow us to address this and another long-standing bug, and could also 
potentially be used to satisfy the use case for 
[KIP-767|https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics],
 there might be enough here to warrant it.


was (Author: chrisegerton):
Thanks [~yujhe.li]. Agreed that an ideal fix for this would prevent control 
records from distorting committed offsets.

One complication is that the connector API allows sink connectors to explicitly 
specify which offsets get committed for each topic partition via the 
{{SinkTask}} class's 
[preCommit|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#preCommit(java.util.Map)]
 method. These offsets are usually derived by querying the 
[kafkaOffset|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#kafkaOffset()]
 method for each {{SinkRecord}} that the task receives in 
[put|https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/sink/SinkTask.html#put(java.util.Collection)].
 This creates an issue because the contract for the {{kafkaOffset}} method is 
that it returns the offset for the consumer record that the {{SinkRecord}} was 
derived from–not the offset that should be committed to Kafka in order to 
signal that that record has been successfully processed by the connector and 
should not be redelivered to it in the future. Examples of this can be found in 
the Confluent HDFS connector 
[here|https://github.com/confluentinc/kafka-connect-hdfs/pull/425] and the 
WePay/Confluent BigQuery connector 
[here|https://github.com/confluentinc/kafka-connect-bigquery/blob/e7c19571ff94f3aa290a02d490b0049af2c51e0c/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/MergeBatches.java#L331-L334].

Unfortunately, we can't really modify the {{kafkaOffset}} method without 
running into some pretty ugly compatibility issues.

One thing that might be worth noting is KAFKA-13431, a work-in-progress effort 
to address a separate issue in sink task offset tracking logic.

Given that issue, here are two possible approaches we could take, which are not 
mutually exclusiv

[GitHub] [kafka] C0urante commented on a change in pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2022-03-23 Thread GitBox


C0urante commented on a change in pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#discussion_r833730504



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
 verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
 }
 
+@Test
+public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+when(coordinator.configSnapshot()).thenReturn(configState);
+
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+// First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+//
+// note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+// assignment after this phase:
+// W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after this phase:
+// W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+expectGeneration();
+assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+++rebalanceNum;
+returnedAssignments = assignmentsCapture.getValue();
+assertDelay(0, returnedAssignments);
+expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+assertNoReassignments(memberConfigs, expectedMemberConfigs);
+assertAssignment(2, 8, 0, 0, "worker1");
+
+// Second assignment with a second worker joining and all connectors 
running on previous worker
+//
+// assignment after this phase:
+// W1: assignedConnectors:[], assignedTasks:[],
+// revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+// W2: assignedConnectors:[], assignedTasks:[]
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after this phase:
+// W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+// W2: connectors:[], tasks:[]
+applyAssignments(returnedAssignments);
+memberConfigs = memberConfigs(leader, offset, assignments);
+memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+expectGeneration();
+assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+++rebalanceNum;
+returnedAssignments = assignmentsCapture.getValue();
+assertDelay(0, returnedAssignments);
+expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+assertNoReassignments(memberConfigs, expectedMemberConfigs);
+assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+// Third assignment after revocations, and a third worker joining
+//
+// assignment after this phase:
+// W1: assignedTasks:[], assignedTasks:[],
+// revokedConnectors:[], revokedTasks:[T0-3]
+// W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+// revokedConnectors:[] revokedTasks:[]
+// W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after this phase:
+// W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+// W2: connectors:[C1], tasks:[T1-0, T1-1]
+// W3: connectors:[], tasks:[T1-2, T1-3]
+applyAssignments(returnedAssignments);
+memberConfigs = memberConfigs(leader, offset, assignments);
+memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+expectGeneration();
+assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+++rebalanceNum;
+returnedAssignments = assignmentsCapture.getValue();
+assertDelay(0, returnedAssignments);
+expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+assertNoReassignments(memberConfigs, expectedMemberConfigs);
+assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");

Review comment:
   Covered by 
[KAFKA-13763](https://issues.apache.org/jira/browse/KAFKA-13763).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2022-03-23 Thread GitBox


C0urante commented on a change in pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#discussion_r833730613



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
 verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
 }
 
+@Test
+public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+when(coordinator.configSnapshot()).thenReturn(configState);
+
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+// First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+//
+// note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+// assignment after this phase:
+// W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after this phase:
+// W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+expectGeneration();
+assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+++rebalanceNum;
+returnedAssignments = assignmentsCapture.getValue();
+assertDelay(0, returnedAssignments);
+expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+assertNoReassignments(memberConfigs, expectedMemberConfigs);
+assertAssignment(2, 8, 0, 0, "worker1");
+
+// Second assignment with a second worker joining and all connectors 
running on previous worker
+//
+// assignment after this phase:
+// W1: assignedConnectors:[], assignedTasks:[],
+// revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+// W2: assignedConnectors:[], assignedTasks:[]
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after this phase:
+// W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+// W2: connectors:[], tasks:[]
+applyAssignments(returnedAssignments);
+memberConfigs = memberConfigs(leader, offset, assignments);
+memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+expectGeneration();
+assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+++rebalanceNum;
+returnedAssignments = assignmentsCapture.getValue();
+assertDelay(0, returnedAssignments);
+expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+assertNoReassignments(memberConfigs, expectedMemberConfigs);
+assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+// Third assignment after revocations, and a third worker joining
+//
+// assignment after this phase:
+// W1: assignedTasks:[], assignedTasks:[],
+// revokedConnectors:[], revokedTasks:[T0-3]
+// W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+// revokedConnectors:[] revokedTasks:[]
+// W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after this phase:
+// W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+// W2: connectors:[C1], tasks:[T1-0, T1-1]
+// W3: connectors:[], tasks:[T1-2, T1-3]
+applyAssignments(returnedAssignments);
+memberConfigs = memberConfigs(leader, offset, assignments);
+memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+expectGeneration();
+assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+++rebalanceNum;
+returnedAssignments = assignmentsCapture.getValue();
+assertDelay(0, returnedAssignments);
+expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+assertNoReassignments(memberConfigs, expectedMemberConfigs);
+assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");
+
+// Forth assignment after revocations, and a forth worker joining
+//
+// assignment after this phase:
+// W1: assignedTasks:[], assignedTasks:[],
+// revokedConnectors:[], revokedTasks:[T0-2]
+// W2: assignedTasks:[], assignedTasks:[]
+// revokedConnectors:[] revokedTasks:[]
+// W3: assignedTasks:[], assignedTasks:[]
+// revokedConnectors:[] revokedTasks:[]
+// W4: assignedTasks:[], assignedTasks:[T0-3]
+// revokedConnectors:[] revokedTasks:[]
+//
+// Final distribution after 

[jira] [Created] (KAFKA-13763) Improve unit testing coverage for IncrementalCooperativeAssignor

2022-03-23 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-13763:
-

 Summary: Improve unit testing coverage for 
IncrementalCooperativeAssignor
 Key: KAFKA-13763
 URL: https://issues.apache.org/jira/browse/KAFKA-13763
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


The 
[tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java]
 for the {{IncrementalCooperativeAssignor}} class provide a moderate level of 
coverage and cover some non-trivial cases, but there are some areas for 
improvement that will allow us to iterate on the assignment logic for Kafka 
Connect faster and with greater confidence.

These improvements include:
 * Adding reusable utility methods to assert that a cluster's assignment is 
*balanced* (the difference in the number of connectors and tasks assigned to 
any two workers is at most one) and *complete* (all connectors and tasks are 
assigned to a worker)
 * Removing the existing 
[assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1373-L1405]
 methods and replacing them with a more fine-grained alternative that allows 
for more granular assertions about the number of tasks/connectors 
assigned/revoked from each worker during a round of rebalance, instead of the 
total for the entire cluster
 * Adding a reusable utility method to assert the current distribution of 
connectors and tasks across the cluster
 * Decomposing large portions of repeated code for simulating a round of 
rebalancing into a reusable utility method
 * Renaming variable names to improve accuracy/readability (the 
{{expectedMemberConfigs}} field, for example, is pretty poorly named)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13763) Improve unit testing coverage for IncrementalCooperativeAssignor

2022-03-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511497#comment-17511497
 ] 

Chris Egerton commented on KAFKA-13763:
---

Discussed earlier during review of https://github.com/apache/kafka/pull/10367

> Improve unit testing coverage for IncrementalCooperativeAssignor
> 
>
> Key: KAFKA-13763
> URL: https://issues.apache.org/jira/browse/KAFKA-13763
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>
> The 
> [tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java]
>  for the {{IncrementalCooperativeAssignor}} class provide a moderate level of 
> coverage and cover some non-trivial cases, but there are some areas for 
> improvement that will allow us to iterate on the assignment logic for Kafka 
> Connect faster and with greater confidence.
> These improvements include:
>  * Adding reusable utility methods to assert that a cluster's assignment is 
> *balanced* (the difference in the number of connectors and tasks assigned to 
> any two workers is at most one) and *complete* (all connectors and tasks are 
> assigned to a worker)
>  * Removing the existing 
> [assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1373-L1405]
>  methods and replacing them with a more fine-grained alternative that allows 
> for more granular assertions about the number of tasks/connectors 
> assigned/revoked from each worker during a round of rebalance, instead of the 
> total for the entire cluster
>  * Adding a reusable utility method to assert the current distribution of 
> connectors and tasks across the cluster
>  * Decomposing large portions of repeated code for simulating a round of 
> rebalancing into a reusable utility method
>  * Renaming variable names to improve accuracy/readability (the 
> {{expectedMemberConfigs}} field, for example, is pretty poorly named)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13764) Potential improvements for Connect incremental rebalancing logic

2022-03-23 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-13764:
-

 Summary: Potential improvements for Connect incremental 
rebalancing logic
 Key: KAFKA-13764
 URL: https://issues.apache.org/jira/browse/KAFKA-13764
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Chris Egerton
Assignee: Chris Egerton


There are a few small changes that we might make to the incremental rebalancing 
logic for Kafka Connect to improve distribution of connectors and tasks across 
a cluster and address potential bugs:
 # During assignment, assign new connectors and tasks across the cluster before 
calculating revocations that may be necessary in order to balance the cluster. 
This way, we can potentially skip a round of revocation by using newly-created 
connectors and tasks to balance out the cluster.
 # Perform connector and task revocation in more cases, such as when one or 
more connectors are reconfigured to use fewer tasks, which can possibly lead to 
an imbalanced cluster.
 # Fix [this 
line|https://github.com/apache/kafka/blob/06ca4850c5b2b12e972f48e03fe4f9c1032f9a3e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L248]
 to use the same aggregation logic that's used 
[here|https://github.com/apache/kafka/blob/06ca4850c5b2b12e972f48e03fe4f9c1032f9a3e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L273-L281]
 in order to avoid overwriting map values when they should be combined instead.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13764) Potential improvements for Connect incremental rebalancing logic

2022-03-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511504#comment-17511504
 ] 

Chris Egerton commented on KAFKA-13764:
---

Discussed earlier during review of [https://github.com/apache/kafka/pull/10367]

> Potential improvements for Connect incremental rebalancing logic
> 
>
> Key: KAFKA-13764
> URL: https://issues.apache.org/jira/browse/KAFKA-13764
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>
> There are a few small changes that we might make to the incremental 
> rebalancing logic for Kafka Connect to improve distribution of connectors and 
> tasks across a cluster and address potential bugs:
>  # During assignment, assign new connectors and tasks across the cluster 
> before calculating revocations that may be necessary in order to balance the 
> cluster. This way, we can potentially skip a round of revocation by using 
> newly-created connectors and tasks to balance out the cluster.
>  # Perform connector and task revocation in more cases, such as when one or 
> more connectors are reconfigured to use fewer tasks, which can possibly lead 
> to an imbalanced cluster.
>  # Fix [this 
> line|https://github.com/apache/kafka/blob/06ca4850c5b2b12e972f48e03fe4f9c1032f9a3e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L248]
>  to use the same aggregation logic that's used 
> [here|https://github.com/apache/kafka/blob/06ca4850c5b2b12e972f48e03fe4f9c1032f9a3e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L273-L281]
>  in order to avoid overwriting map values when they should be combined 
> instead.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ijuma commented on a change in pull request #11938: MINOR: Clarify how to publish specific projects to the local repo

2022-03-23 Thread GitBox


ijuma commented on a change in pull request #11938:
URL: https://github.com/apache/kafka/pull/11938#discussion_r833746787



##
File path: README.md
##
@@ -180,15 +180,21 @@ Please note for this to work you should create/update 
user maven settings (typic
  ...
 
 
-### Installing the jars to the local Maven repository ###
-The recommended command is:
+### Installing ALL the jars to the local Maven repository ###
+The recommended command to build for both Scala 2.12 and 2.13 is:
 
 ./gradlewAll publishToMavenLocal
 
 For backwards compatibility, the following also works:
 
 ./gradlewAll install
 
+### Installing specific projects to the local Maven repository ###
+
+./gradlew -PskipSigning :streams:publishToMavenLocal

Review comment:
   A bit odd that `skipSigning` is used here, but not in the `./gradleAll` 
version. Is there a reason?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13763) Improve unit testing coverage for IncrementalCooperativeAssignor

2022-03-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13763:
--
Description: 
The 
[tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java]
 for the {{IncrementalCooperativeAssignor}} class provide a moderate level of 
coverage and cover some non-trivial cases, but there are some areas for 
improvement that will allow us to iterate on the assignment logic for Kafka 
Connect faster and with greater confidence.

These improvements include:
 * Adding reusable utility methods to assert that a cluster's assignment is 
*balanced* (the difference in the number of connectors and tasks assigned to 
any two workers is at most one) and *complete* (all connectors and tasks are 
assigned to a worker)
 * Removing the existing 
[assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1373-L1405]
 methods and replacing them with a more fine-grained alternative that allows 
for more granular assertions about the number of tasks/connectors 
assigned/revoked from each worker during a round of rebalance, instead of the 
total for the entire cluster
 * Adding a reusable utility method to assert the current distribution of 
connectors and tasks across the cluster
 * Decomposing large portions of repeated code for simulating a round of 
rebalancing into a reusable utility method
 * Renaming variable names to improve accuracy/readability (the 
{{expectedMemberConfigs}} field, for example, is pretty poorly named)

But other improvements may be added in a pull request that addresses the above 
as they come up.

  was:
The 
[tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java]
 for the {{IncrementalCooperativeAssignor}} class provide a moderate level of 
coverage and cover some non-trivial cases, but there are some areas for 
improvement that will allow us to iterate on the assignment logic for Kafka 
Connect faster and with greater confidence.

These improvements include:
 * Adding reusable utility methods to assert that a cluster's assignment is 
*balanced* (the difference in the number of connectors and tasks assigned to 
any two workers is at most one) and *complete* (all connectors and tasks are 
assigned to a worker)
 * Removing the existing 
[assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java#L1373-L1405]
 methods and replacing them with a more fine-grained alternative that allows 
for more granular assertions about the number of tasks/connectors 
assigned/revoked from each worker during a round of rebalance, instead of the 
total for the entire cluster
 * Adding a reusable utility method to assert the current distribution of 
connectors and tasks across the cluster
 * Decomposing large portions of repeated code for simulating a round of 
rebalancing into a reusable utility method
 * Renaming variable names to improve accuracy/readability (the 
{{expectedMemberConfigs}} field, for example, is pretty poorly named)


> Improve unit testing coverage for IncrementalCooperativeAssignor
> 
>
> Key: KAFKA-13763
> URL: https://issues.apache.org/jira/browse/KAFKA-13763
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
>
> The 
> [tests|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java]
>  for the {{IncrementalCooperativeAssignor}} class provide a moderate level of 
> coverage and cover some non-trivial cases, but there are some areas for 
> improvement that will allow us to iterate on the assignment logic for Kafka 
> Connect faster and with greater confidence.
> These improvements include:
>  * Adding reusable utility methods to assert that a cluster's assignment is 
> *balanced* (the difference in the number of connectors and tasks assigned to 
> any two workers is at most one) and *complete* (all connectors and tasks are 
> assigned to a worker)
>  * Removing the existing 
> [assertAssignment|https://github.com/apache/kafka/blob/dcd09de1ed84b43f269eb32fc2baf589a791d468/connect/runtime/src/test/java/org/apache/kafka/c

[GitHub] [kafka] kkonstantine merged pull request #11933: KAFKA-13759: Disable idempotence by default in producers instantiated by Connect

2022-03-23 Thread GitBox


kkonstantine merged pull request #11933:
URL: https://github.com/apache/kafka/pull/11933


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect

2022-03-23 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13759:
---
Summary: Disable producer idempotence by default in producers instantiated 
by Connect  (was: Disable producer idempotency by default in producers 
instantiated by Connect)

> Disable producer idempotence by default in producers instantiated by Connect
> 
>
> Key: KAFKA-13759
> URL: https://issues.apache.org/jira/browse/KAFKA-13759
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently 
> referring to KIP-318. Before that in AK 3.0 idempotency was enabled by 
> default across Kafka producers. 
> However, some compatibility implications were missed in both cases. 
> If idempotency is enabled by default Connect won't be able to communicate via 
> its producers with Kafka brokers older than version 0.11. Perhaps more 
> importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL 
> is required to be granted to the principal of the Connect worker. 
> Given the above caveats, this ticket proposes to explicitly disable producer 
> idempotency in Connect by default. This feature, as it happens today, can be 
> enabled by setting worker and/or connector properties. However, enabling it 
> by default should be considered in a major version upgrade and after KIP-318 
> is updated to mention the compatibility requirements and gets officially 
> approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect

2022-03-23 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13759:
---
Fix Version/s: 3.2.0
   3.1.1
   3.0.2

> Disable producer idempotence by default in producers instantiated by Connect
> 
>
> Key: KAFKA-13759
> URL: https://issues.apache.org/jira/browse/KAFKA-13759
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently 
> referring to KIP-318. Before that in AK 3.0 idempotence was enabled by 
> default across Kafka producers. 
> However, some compatibility implications were missed in both cases. 
> If idempotence is enabled by default Connect won't be able to communicate via 
> its producers with Kafka brokers older than version 0.11. Perhaps more 
> importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL 
> is required to be granted to the principal of the Connect worker. 
> Given the above caveats, this ticket proposes to explicitly disable producer 
> idempotence in Connect by default. This feature, as it happens today, can be 
> enabled by setting worker and/or connector properties. However, enabling it 
> by default should be considered in a major version upgrade and after KIP-318 
> is updated to mention the compatibility requirements and gets officially 
> approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect

2022-03-23 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-13759:
---
Description: 
https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently referring 
to KIP-318. Before that in AK 3.0 idempotence was enabled by default across 
Kafka producers. 

However, some compatibility implications were missed in both cases. 

If idempotence is enabled by default Connect won't be able to communicate via 
its producers with Kafka brokers older than version 0.11. Perhaps more 
importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL is 
required to be granted to the principal of the Connect worker. 

Given the above caveats, this ticket proposes to explicitly disable producer 
idempotence in Connect by default. This feature, as it happens today, can be 
enabled by setting worker and/or connector properties. However, enabling it by 
default should be considered in a major version upgrade and after KIP-318 is 
updated to mention the compatibility requirements and gets officially approved. 

  was:
https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently referring 
to KIP-318. Before that in AK 3.0 idempotency was enabled by default across 
Kafka producers. 

However, some compatibility implications were missed in both cases. 

If idempotency is enabled by default Connect won't be able to communicate via 
its producers with Kafka brokers older than version 0.11. Perhaps more 
importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL is 
required to be granted to the principal of the Connect worker. 

Given the above caveats, this ticket proposes to explicitly disable producer 
idempotency in Connect by default. This feature, as it happens today, can be 
enabled by setting worker and/or connector properties. However, enabling it by 
default should be considered in a major version upgrade and after KIP-318 is 
updated to mention the compatibility requirements and gets officially approved. 


> Disable producer idempotence by default in producers instantiated by Connect
> 
>
> Key: KAFKA-13759
> URL: https://issues.apache.org/jira/browse/KAFKA-13759
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently 
> referring to KIP-318. Before that in AK 3.0 idempotence was enabled by 
> default across Kafka producers. 
> However, some compatibility implications were missed in both cases. 
> If idempotence is enabled by default Connect won't be able to communicate via 
> its producers with Kafka brokers older than version 0.11. Perhaps more 
> importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL 
> is required to be granted to the principal of the Connect worker. 
> Given the above caveats, this ticket proposes to explicitly disable producer 
> idempotence in Connect by default. This feature, as it happens today, can be 
> enabled by setting worker and/or connector properties. However, enabling it 
> by default should be considered in a major version upgrade and after KIP-318 
> is updated to mention the compatibility requirements and gets officially 
> approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect

2022-03-23 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511516#comment-17511516
 ] 

Konstantine Karantasis commented on KAFKA-13759:


This issue has been now been merged on the 3.2 and 3.1 branches to avoid a 
breaking change when Connect. 
[~cadonna] [~tombentley] fyi. 
Hopefully this fix makes to the upcoming releases but please let me know if the 
targeted versions need to be adjusted. 

> Disable producer idempotence by default in producers instantiated by Connect
> 
>
> Key: KAFKA-13759
> URL: https://issues.apache.org/jira/browse/KAFKA-13759
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently 
> referring to KIP-318. Before that in AK 3.0 idempotence was enabled by 
> default across Kafka producers. 
> However, some compatibility implications were missed in both cases. 
> If idempotence is enabled by default Connect won't be able to communicate via 
> its producers with Kafka brokers older than version 0.11. Perhaps more 
> importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL 
> is required to be granted to the principal of the Connect worker. 
> Given the above caveats, this ticket proposes to explicitly disable producer 
> idempotence in Connect by default. This feature, as it happens today, can be 
> enabled by setting worker and/or connector properties. However, enabling it 
> by default should be considered in a major version upgrade and after KIP-318 
> is updated to mention the compatibility requirements and gets officially 
> approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13759) Disable producer idempotence by default in producers instantiated by Connect

2022-03-23 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-13759.

Resolution: Fixed

> Disable producer idempotence by default in producers instantiated by Connect
> 
>
> Key: KAFKA-13759
> URL: https://issues.apache.org/jira/browse/KAFKA-13759
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> https://issues.apache.org/jira/browse/KAFKA-7077 was merged recently 
> referring to KIP-318. Before that in AK 3.0 idempotence was enabled by 
> default across Kafka producers. 
> However, some compatibility implications were missed in both cases. 
> If idempotence is enabled by default Connect won't be able to communicate via 
> its producers with Kafka brokers older than version 0.11. Perhaps more 
> importantly, for brokers older than version 2.8 the {{IDEMPOTENT_WRITE}} ACL 
> is required to be granted to the principal of the Connect worker. 
> Given the above caveats, this ticket proposes to explicitly disable producer 
> idempotence in Connect by default. This feature, as it happens today, can be 
> enabled by setting worker and/or connector properties. However, enabling it 
> by default should be considered in a major version upgrade and after KIP-318 
> is updated to mention the compatibility requirements and gets officially 
> approved. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] kkonstantine commented on pull request #11933: KAFKA-13759: Disable idempotence by default in producers instantiated by Connect

2022-03-23 Thread GitBox


kkonstantine commented on pull request #11933:
URL: https://github.com/apache/kafka/pull/11933#issuecomment-1076884660


   Thanks @rhauch. The fix has now been merged to trunk and cherry picked to 
3.2, 3.1 and 3.0 with the appropriate adjustments to the upgrade notes. cc 
@cadonna @tombentley 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511522#comment-17511522
 ] 

Guozhang Wang commented on KAFKA-13689:
---

Hi [~RivenSun] I think I agree with you on the general note, my concern is that 
since we do not control when `logUnused` is called, at the time when not all 
provided values are retrieved, then we would log `config ... is not used.` 
which would then be a bit misleading since they are likely going to be used 
later indeed. But after a second thought, the semantics is fine we we just say 
`config  ... is not yet used.` at the time when `logUnused` is called, so it's 
really the responsibility of the caller regarding when they want to call this 
function to check which configs are not yet used.

So I think we can just like you said log it as (just adding `yet` at the end of 
the sentence).

{code}
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used 
yet.", unusedkeys);
}
} 
{code}

Since we always call `logUnused` at the end of the constructor of 
producer/consumer/admin, then it's very likely that those unknown configs are 
not retrieved yet and hence would be logged. For that, I'd say we update our 
web docs indicating this effect exactly to clear any possible confusions.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void log

[GitHub] [kafka] kkonstantine commented on a change in pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-23 Thread GitBox


kkonstantine commented on a change in pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#discussion_r833823045



##
File path: docs/connect.html
##
@@ -74,6 +74,7 @@ Running 
Kafka Connectconfig.storage.topic (default 
connect-configs) - topic to use for storing connector and task 
configurations; note that this should be a single partition, highly replicated, 
compacted topic. You may need to manually create the topic to ensure the 
correct configuration as auto created topics may have multiple partitions or be 
automatically configured for deletion rather than compaction
 offset.storage.topic (default 
connect-offsets) - topic to use for storing offsets; this topic 
should have many partitions, be replicated, and be configured for 
compaction
 status.storage.topic (default 
connect-status) - topic to use for storing statuses; this topic 
can have multiple partitions, and should be replicated and configured for 
compaction
+plugin.path (default empty) - a list of 
paths that contain plugins (connectors, converters, transformations). For the 
purpose of quick starts users will have to add the path that contains the 
FileStreamSourceConnector and FileStreamSinkConnector packaged in 
connect-file-"version".jar, because these connectors are not 
included by default to the CLASSPATH or the 
plugin.path of the Connect worker

Review comment:
   Thanks for the comment @mimaison 
   I've moved this bullet further up where we include properties for both 
standalone and distributed. 
   
   Regarding the quickstart, the reason I didn't change this file is that it's 
not shown anymore. Which might be something to fix, but possibly not in this 
PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11908: KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default

2022-03-23 Thread GitBox


kkonstantine commented on pull request #11908:
URL: https://github.com/apache/kafka/pull/11908#issuecomment-1076948497


   I rebased to get the changes from https://github.com/apache/kafka/pull/11933 
and get a green run of system tests 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13765) Describe-consumer admin should not return unstable membership information

2022-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13765:
-

 Summary: Describe-consumer admin should not return unstable 
membership information
 Key: KAFKA-13765
 URL: https://issues.apache.org/jira/browse/KAFKA-13765
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: Guozhang Wang


When a consumer group is in the “prepare-rebalance” phase, it's unclear if all 
its currently registered members would still be re-joining in the new 
generation or not, in this case, if we simply return the current members map to 
the describe-consumer request it may be misleading as users would be getting 
spurious results that may contain those dropping or even zombie consumers.

So I think during the prepare-rebalance phase, we should either only return 
members who's join-group requests have already been received, OR we simply 
return the response with no members and indicate that via prepare-rebalance 
state the membership info is unstable and hence won't be returned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] RivenSun2 opened a new pull request #11940: KAFKA-13689: optimize the log output of logUnused method

2022-03-23 Thread GitBox


RivenSun2 opened a new pull request #11940:
URL: https://github.com/apache/kafka/pull/11940


optimize the log output of logUnused method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11940: KAFKA-13689: optimize the log output of logUnused method

2022-03-23 Thread GitBox


RivenSun2 commented on pull request #11940:
URL: https://github.com/apache/kafka/pull/11940#issuecomment-1076963317


   Hi @guozhangwang 
   please help to review PR when available.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511533#comment-17511533
 ] 

RivenSun commented on KAFKA-13689:
--

[~guozhang] Thank you for your reply.
I will create a PR(11940) to optimize the log output of logUnused().
Please help to review PR when available.
Thanks.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13766:
-

 Summary: Use `max.poll.interval.ms` as the timeout during 
complete-rebalance phase
 Key: KAFKA-13766
 URL: https://issues.apache.org/jira/browse/KAFKA-13766
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Guozhang Wang


The lifetime of a consumer can be categorized in three phases:

1) During normal processing, the broker expects a hb request periodically from 
consumer, and that is timed by the `session.timeout.ms`.

2) During the prepare_rebalance, the broker would expect a join-group request 
to be received within the rebalance.timeout, which is piggy-backed as the 
`max.poll.interval.ms`.

3) During the complete_rebalance, the broker would expect a sync-group request 
to be received again within the `session.timeout.ms`.

So during different phases of the life of the consumer, different timeout would 
be used to bound the timer.

Nowadays with cooperative rebalance protocol, we can still return records and 
process them in the middle of a rebalance from {{consumer.poll}}. In that case, 
for phase 3) we should also use the `max.poll.interval.ms` to bound the timer, 
which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-03-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511534#comment-17511534
 ] 

Guozhang Wang commented on KAFKA-13766:
---

cc [~dajac]

> Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
> -
>
> Key: KAFKA-13766
> URL: https://issues.apache.org/jira/browse/KAFKA-13766
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> The lifetime of a consumer can be categorized in three phases:
> 1) During normal processing, the broker expects a hb request periodically 
> from consumer, and that is timed by the `session.timeout.ms`.
> 2) During the prepare_rebalance, the broker would expect a join-group request 
> to be received within the rebalance.timeout, which is piggy-backed as the 
> `max.poll.interval.ms`.
> 3) During the complete_rebalance, the broker would expect a sync-group 
> request to be received again within the `session.timeout.ms`.
> So during different phases of the life of the consumer, different timeout 
> would be used to bound the timer.
> Nowadays with cooperative rebalance protocol, we can still return records and 
> process them in the middle of a rebalance from {{consumer.poll}}. In that 
> case, for phase 3) we should also use the `max.poll.interval.ms` to bound the 
> timer, which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-03-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511537#comment-17511537
 ] 

Matthias J. Sax commented on KAFKA-13542:
-

Curious: did we figure out why the patch resulted in the perf regression?

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13765) Describe-consumer admin should not return unstable membership information

2022-03-23 Thread Ryan Leslie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511546#comment-17511546
 ] 

Ryan Leslie commented on KAFKA-13765:
-

[~guozhang] Are you referring to KafkaAdminClient.describeConsumerGroups(), or 
just something like the kafka-consumer-groups.sh wrapper? If it's the admin 
client, we may need to be extra careful not to make a backward incompatible API 
change, like in KAFKA-12879. For example, if someone is relying on the admin 
client to monitor consumer group members then this change might affect such an 
application by causing an unexpected dip in the results during rebalances. 
While I agree it's unknown if all members will rejoin, most will in most cases, 
and subsequent calls to describe the consumer group would eventually reveal 
this. If we do want to support a different behavior, perhaps consider making it 
optional in DescribeConsumerGroupsOptions. Also, a user today can simply 
disregard the result themselves by checking ConsumerGroupDescription.state(). 
They can choose to retry later if they are only interested in seeing the group 
in a stable state.

> Describe-consumer admin should not return unstable membership information
> -
>
> Key: KAFKA-13765
> URL: https://issues.apache.org/jira/browse/KAFKA-13765
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Guozhang Wang
>Priority: Major
>
> When a consumer group is in the “prepare-rebalance” phase, it's unclear if 
> all its currently registered members would still be re-joining in the new 
> generation or not, in this case, if we simply return the current members map 
> to the describe-consumer request it may be misleading as users would be 
> getting spurious results that may contain those dropping or even zombie 
> consumers.
> So I think during the prepare-rebalance phase, we should either only return 
> members who's join-group requests have already been received, OR we simply 
> return the response with no members and indicate that via prepare-rebalance 
> state the membership info is unstable and hence won't be returned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-03-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511547#comment-17511547
 ] 

Luke Chen commented on KAFKA-9366:
--

[~brandonk]  and all, we are aware of the log4j CVE issue is impacting many 
users. Currently, we are discussing compatibility issue for the log4j2 upgrade, 
and is considering to temporarily replace log4j with reload4j in v3.2.0. The 
discussion thread is here: 
https://lists.apache.org/thread/qo1y3249xldt4cpg6r8zkcq5m1q32bf1 

 

Welcome to provide your comments and thoughts. Thanks.

 

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-03-23 Thread GitBox


showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833872517



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review comment:
   Seriously, I'm not quite sure if we should commit checkpoint here or 
not. When entering this phase, the task might be `RESTORING` state or `RUNNING` 
state. We can checkpoint for `RESTORING` state, but for `RUNNING` state, we 
might not have to. So, I was thinking we did something like this:
   ```
   task.postCommit(!task.state().equals("RUNNING"));
   ```
   But I checked again the discussion thread in JIRA, @ableegoldman  suggest we 
did checkpoint after committing. So, I'm wondering if we should just force to 
`true` here or not?
   
https://issues.apache.org/jira/browse/KAFKA-13295?focusedCommentId=17429067&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429067
   
   cc @guozhangwang @ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-03-23 Thread GitBox


showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833872517



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review comment:
   Seriously, I'm not quite sure if we should commit checkpoint here or 
not. When entering this phase, the task might be `RESTORING` state or `RUNNING` 
state or others, maybe. We can checkpoint for `RESTORING` state, but for 
`RUNNING` state, we might not have to. So, I was thinking we did something like 
this:
   ```
   task.postCommit(!task.state().equals("RUNNING"));
   ```
   But I checked again the discussion thread in JIRA, @ableegoldman  suggest we 
did checkpoint after committing. So, I'm wondering if we should just force to 
`true` here or not?
   
https://issues.apache.org/jira/browse/KAFKA-13295?focusedCommentId=17429067&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429067
   
   cc @guozhangwang @ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11926: KAFKA-13714: Fix cache flush position

2022-03-23 Thread GitBox


vvcephei commented on pull request #11926:
URL: https://github.com/apache/kafka/pull/11926#issuecomment-1077021766


   Test failures unrelated:
   
   ```
   
   
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, 
useInlinePem=false | 0.38 sec | 1
   -- | -- | --
   Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerNewIBP()
   
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, 
useInlinePem=false](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11926/5/testReport/org.apache.kafka.common.network/SslTransportLayerTest/Build___JDK_8_and_Scala_2_121__tlsProtocol_TLSv1_2__useInlinePem_false/)
  0.38 sec
[1](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11926/5/)
[Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerNewIBP()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11926/5/testReport/integration.kafka.server/FetchRequestBetweenDifferentIbpTest/Build___JDK_8_and_Scala_2_12___testControllerNewIBP__/)
```

The java 17 build is still queued after 5.5 hours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #11926: KAFKA-13714: Fix cache flush position

2022-03-23 Thread GitBox


vvcephei merged pull request #11926:
URL: https://github.com/apache/kafka/pull/11926


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11936: MINOR: Fix an incompatible bug in GetOffsetShell

2022-03-23 Thread GitBox


showuon commented on a change in pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#discussion_r833881280



##
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##
@@ -166,6 +166,13 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
 )
   }
 
+  @Test
+  def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {

Review comment:
   Checking the code, it looks like the unknown_offset could mean the error 
state or, like your test did, the timestamp is not found. Could we also add a 
test to verify if the request has some error response, we should get empty 
offset, too (due to the unknown_offset response). Ex: the topic doesn't exist.

##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -135,7 +135,11 @@ object GetOffsetShell {
   val partitionOffsets = partitionInfos.flatMap { tp =>
 try {
   val partitionInfo = listOffsetsResult.partitionResult(tp).get
-  Some((tp, partitionInfo.offset))
+  if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
+Some((tp, partitionInfo.offset))
+  } else {
+None

Review comment:
   Could we output something to user that like the consumer group command 
did: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L680-L681
   
   ex: "Warn: the offset for Partition $tp is unknown." WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11936: MINOR: Fix an incompatible bug in GetOffsetShell

2022-03-23 Thread GitBox


showuon commented on a change in pull request #11936:
URL: https://github.com/apache/kafka/pull/11936#discussion_r833881280



##
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##
@@ -166,6 +166,13 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
 )
   }
 
+  @Test
+  def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {

Review comment:
   Checking the code, it looks like the unknown_offset could mean the error 
state or, like your test did, the timestamp is not found. Could we also add a 
test to verify if the request has some error response, we should get empty 
offset, too. Ex: the topic doesn't exist.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11940: KAFKA-13689: optimize the log output of logUnused method

2022-03-23 Thread GitBox


guozhangwang commented on pull request #11940:
URL: https://github.com/apache/kafka/pull/11940#issuecomment-1077031424


   Hi @RivenSun2 I think we also want to call `ignore` when certain producer 
feature is disabled as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11926: KAFKA-13714: Fix cache flush position

2022-03-23 Thread GitBox


vvcephei commented on pull request #11926:
URL: https://github.com/apache/kafka/pull/11926#issuecomment-1077034307


   Cherry-picked to 3.2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-23 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-13714.
--
Fix Version/s: 3.2.0
 Assignee: John Roesler
   Resolution: Fixed

> Flaky test IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 3.2.0
>
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 3]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778)
>{code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780)
>  {code}
> {code:java}
> verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] 
>     java.lang.AssertionError: 
> Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165,
>  executionInfo=[], position=Position{position={input-topic={1=1}, 
> globalResult=null}
>     Expected: is <[0, 1, 2, 3]> 
>          but: was <[0, 2, 3]>
>         at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>        

[GitHub] [kafka] showuon commented on pull request #10367: KAFKA-12495: allow consecutive revoke in incremental cooperative assignor in connector

2022-03-23 Thread GitBox


showuon commented on pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#issuecomment-1077035346


   @C0urante , sure, please file another PR for other comments. And thanks for 
the comments. However, I'm still concerned that @kkonstantine doesn't like the 
current solution, and would like to have another proposal as mentioned 
[here](https://issues.apache.org/jira/browse/KAFKA-12495?focusedCommentId=17380872&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17380872).
 So, I think we still need to get his approval before we can continue. WDYT?
   
   cc @kkonstantine  , we need your suggestions here, please!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


showuon commented on pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#issuecomment-1077041046


   Failed tests are unrelated.
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


showuon merged pull request #11920:
URL: https://github.com/apache/kafka/pull/11920


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9826) Log cleaning repeatedly picks same segment with no effect when first dirty offset is past start of active segment

2022-03-23 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511576#comment-17511576
 ] 

zhangzhisheng commented on KAFKA-9826:
--

[~junrao]  kafka version from 2.4.1 to 2.8.1,the issue has sloved,tks

> Log cleaning repeatedly picks same segment with no effect when first dirty 
> offset is past start of active segment
> -
>
> Key: KAFKA-9826
> URL: https://issues.apache.org/jira/browse/KAFKA-9826
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.4.1
>Reporter: Steve Rodrigues
>Assignee: Steve Rodrigues
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> Seen on a system where a given partition had a single segment, and for 
> whatever reason (deleteRecords?), the logStartOffset was greater than the 
> base segment of the log, there were a continuous series of 
> ```
> [2020-03-03 16:56:31,374] WARN Resetting first dirty offset of  FOO-3 to log 
> start offset 55649 since the checkpointed offset 0 is invalid. 
> (kafka.log.LogCleanerManager$)
> ```
> messages (partition name changed, it wasn't really FOO). This was expected to 
> be resolved by KAFKA-6266 but clearly wasn't. 
> Further investigation revealed that  a few segments were continuously 
> cleaning and generating messages in the `log-cleaner.log` of the form:
> ```
> [2020-03-31 13:34:50,924] INFO Cleaner 1: Beginning cleaning of log FOO-3 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,924] INFO Cleaner 1: Building offset map for FOO-3... 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Building offset map for log FOO-3 
> for 0 segments in offset range [55287, 54237). (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Offset map for log FOO-3 complete. 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Cleaning log FOO-3 (cleaning prior 
> to Wed Dec 31 19:00:00 EST 1969, discarding tombstones prior to Tue Dec 10 
> 13:39:08 EST 2019)... (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO [kafka-log-cleaner-thread-1]: Log cleaner 
> thread 1 cleaned log FOO-3 (dirty section = [55287, 55287])
> 0.0 MB of log processed in 0.0 seconds (0.0 MB/sec).
> Indexed 0.0 MB in 0.0 seconds (0.0 Mb/sec, 100.0% of total time)
> Buffer utilization: 0.0%
> Cleaned 0.0 MB in 0.0 seconds (NaN Mb/sec, 0.0% of total time)
> Start size: 0.0 MB (0 messages)
> End size: 0.0 MB (0 messages) NaN% size reduction (NaN% fewer messages) 
> (kafka.log.LogCleaner)
> ```
> What seems to have happened here (data determined for a different partition) 
> is:
> There exist a number of partitions here which get relatively low traffic, 
> including our friend FOO-5. For whatever reason, LogStartOffset of this 
> partition has moved beyond the baseOffset of the active segment. (Notes in 
> other issues indicate that this is a reasonable scenario.) So there’s one 
> segment, starting at 166266, and a log start of 166301.
> grabFilthiestCompactedLog runs and reads the checkpoint file. We see that 
> this topicpartition needs to be cleaned, and call cleanableOffsets on it 
> which returns an OffsetsToClean with firstDirtyOffset == logStartOffset == 
> 166301 and firstUncleanableOffset = max(logStart, activeSegment.baseOffset) = 
> 116301, and forceCheckpoint = true.
> The checkpoint file is updated in grabFilthiestCompactedLog (this is the fix 
> for KAFKA-6266). We then create a LogToClean object based on the 
> firstDirtyOffset and firstUncleanableOffset of 166301 (past the active 
> segment’s base offset).
> The LogToClean object has cleanBytes = logSegments(-1, 
> firstDirtyOffset).map(_.size).sum → the size of this segment. It has 
> firstUncleanableOffset and cleanableBytes determined by 
> calculateCleanableBytes. calculateCleanableBytes returns:
> {{}}
> {{val firstUncleanableSegment = 
> log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)}}
> {{val firstUncleanableOffset = firstUncleanableSegment.baseOffset}}
> {{val cleanableBytes = log.logSegments(firstDirtyOffset, 
> math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
> (firstUncleanableOffset, cleanableBytes)}}
> firstUncleanableSegment is activeSegment. firstUncleanableOffset is the base 
> offset, 166266. cleanableBytes is looking for logSegments(166301, max(166301, 
> 166266) → which _is the active segment_
> So there are “cleanableBytes” > 0.
> We then filter out segments with totalbytes (clean + cleanable) > 0. This 
> segment has totalBytes > 0, and it has cleanablebytes, so great! It’s 
> filthiest.
> The cleaner picks it, calls cleanLog on it, which then does cleaner.clean, 
> which returns nextDirtyOffset and cleaner sta

[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-03-23 Thread Hao Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511577#comment-17511577
 ] 

Hao Li commented on KAFKA-13542:


[~mjsax] , [~cadonna] , I haven't got time to figure out why this caused perf 
regression and hence fix it again. When do we plan to release 3.2?

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11920: KAFKA-13672: Race condition in DynamicBrokerConfig

2022-03-23 Thread GitBox


showuon commented on pull request #11920:
URL: https://github.com/apache/kafka/pull/11920#issuecomment-1077046585


   @cadonna , also cherry-pick back to 3.2 branch. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13672) Race condition in DynamicBrokerConfig

2022-03-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13672.
---
Fix Version/s: 3.3.0
   Resolution: Fixed

> Race condition in DynamicBrokerConfig
> -
>
> Key: KAFKA-13672
> URL: https://issues.apache.org/jira/browse/KAFKA-13672
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bruno Cadonna
>Assignee: Liam Clarke-Hutchinson
>Priority: Blocker
> Fix For: 3.2.0, 3.3.0
>
>
> Stacktrace:
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <2> but was: <1>
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at 
> app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
>   at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfigOnServer$1(DynamicBrokerReconfigurationTest.scala:1500)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfigOnServer(DynamicBrokerReconfigurationTest.scala:1500)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1(DynamicBrokerReconfigurationTest.scala:1495)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1$adapted(DynamicBrokerReconfigurationTest.scala:1495)
>   at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>   at 
> app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>   at app//scala.collection.AbstractIterable.foreach(Iterable.scala:926)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfig(DynamicBrokerReconfigurationTest.scala:1495)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.reconfigureServers(DynamicBrokerReconfigurationTest.scala:1440)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:775)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:768)
>   at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:784)
> {code}
> Job: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11751/5/testReport/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] RivenSun2 commented on pull request #11940: KAFKA-13689: optimize the log output of logUnused method

2022-03-23 Thread GitBox


RivenSun2 commented on pull request #11940:
URL: https://github.com/apache/kafka/pull/11940#issuecomment-1077069535


   @guozhangwang 
   Of course, I briefly sort out where KafkaClient calls 
`AbstractConfig.getBoolean` method.
   Only the `KafkaProducer.configureTransactionState` method needs to be 
processed.
   `transactional.id` and `retry.backoff.ms` are retrieved elsewhere.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-03-23 Thread GitBox


vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833912049



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review comment:
   Thanks @showuon . Well actually this was true before and based on the 
last review you had suggested to be set it to false that's why I toggled it. 
Yeah going by that JIRA, it could be left to true. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-03-23 Thread GitBox


vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833912049



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review comment:
   Thanks @showuon . I think it was true before but I toggles it. Yeah 
going by that JIRA, it could be left to true. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11940: KAFKA-13689: optimize the log output of logUnused method

2022-03-23 Thread GitBox


guozhangwang commented on a change in pull request #11940:
URL: https://github.com/apache/kafka/pull/11940#discussion_r833912567



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -530,6 +530,9 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
 log.info("Instantiated a transactional producer.");
 else
 log.info("Instantiated an idempotent producer.");
+} else {
+// ignore unretrieved configurations related to producer 
transaction
+config.ignore(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);

Review comment:
   Should we also ignore `TRANSACTIONAL_ID_CONFIG` as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >