[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


jeffkbkim commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r770279783



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -559,8 +606,10 @@ private void expectJoinGroup(
 return false;
 }
 JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) 
body).data();
+boolean isReasonMatching = expectedReason == null || 
joinGroupRequest.reason().equals(expectedReason);

Review comment:
   i added a comment. the reason is never set to null in 
AbstractCoordinator. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


jeffkbkim commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r770278660



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -486,6 +487,43 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
 ensureActiveGroup(rejoinedGeneration, memberId);
 }
 
+@Test
+public void testRejoinReason() {
+setupCoordinator();
+
+String memberId = "memberId";
+int generation = 5;
+
+// test initial reason
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+expectJoinGroup("", "initialized abstract coordinator", generation, 
memberId);
+
+// successful sync group response should reset reason
+expectSyncGroup(generation, memberId);
+ensureActiveGroup(generation, memberId);
+assertEquals("", coordinator.rejoinReason());
+
+// Force a rebalance
+expectJoinGroup(memberId, "Manual test trigger", generation, memberId);
+expectSyncGroup(generation, memberId);
+coordinator.requestRejoin("Manual test trigger");
+ensureActiveGroup(generation, memberId);
+assertEquals("", coordinator.rejoinReason());
+
+// max group size reached
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, 
memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
+coordinator.requestRejoin("Manual test trigger 2");
+try {
+coordinator.joinGroupIfNeeded(mockTime.timer(100L));
+} catch (GroupMaxSizeReachedException e) {

Review comment:
   we do not want to stop when an exception is thrown - we need to verify 
that the reason is updated to the appropriate exception message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


jeffkbkim commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r770278248



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -181,6 +182,8 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
 case Some(group) =>
   group.inLock {
+if (reason != null)
+  info(s"memberId=$memberId with groupInstanceId=$groupInstanceId 
is attempting to join groupId=$groupId due to: $reason")

Review comment:
   yeah, that was my original thoughts as well. i believe you are referring 
to `maybePrepareRebalance(group: GroupMetadata, reason: String)`. 
   methods that are referencing this are:
   - `addMemberAndRebalance`
   - `doSyncGroup` (NA)
   - `removeMemberAndUpdateGroup` (NA)
   - `updateMemberAndRebalance`
   - `updateStaticMemberAndRebalance`
   
   i've passed the reason into all parent methods. also, i've updated 
`updateStaticMemberAndRebalance`'s direct call to `prepareRebalance()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


jeffkbkim commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r770277817



##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -30,7 +30,9 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is the same as version 6.
-  "validVersions": "0-7",
+  //
+  // Version 8 adds the Reason field (KIP-800).
+  "validVersions": "0-8",

Review comment:
   bumped the response version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


showuon commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r770265543



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -486,6 +487,43 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
 ensureActiveGroup(rejoinedGeneration, memberId);
 }
 
+@Test
+public void testRejoinReason() {
+setupCoordinator();
+
+String memberId = "memberId";
+int generation = 5;
+
+// test initial reason
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+expectJoinGroup("", "initialized abstract coordinator", generation, 
memberId);
+
+// successful sync group response should reset reason
+expectSyncGroup(generation, memberId);
+ensureActiveGroup(generation, memberId);
+assertEquals("", coordinator.rejoinReason());
+
+// Force a rebalance
+expectJoinGroup(memberId, "Manual test trigger", generation, memberId);
+expectSyncGroup(generation, memberId);
+coordinator.requestRejoin("Manual test trigger");
+ensureActiveGroup(generation, memberId);
+assertEquals("", coordinator.rejoinReason());
+
+// max group size reached
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, 
memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));
+coordinator.requestRejoin("Manual test trigger 2");
+try {
+coordinator.joinGroupIfNeeded(mockTime.timer(100L));
+} catch (GroupMaxSizeReachedException e) {

Review comment:
   any reason why we don't use `assertThrows` here?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -559,8 +606,10 @@ private void expectJoinGroup(
 return false;
 }
 JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) 
body).data();
+boolean isReasonMatching = expectedReason == null || 
joinGroupRequest.reason().equals(expectedReason);

Review comment:
   I don't think the logic here is correct. 
   If the `expectedReason` is null, but the `joinGroupRequest.reason()` is not 
null, should it be passed?
   That also implies you need add tests to cover this situation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-15 Thread GitBox


showuon commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r770255842



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3731,6 +3731,11 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 } else {
 members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
 }
+
+String baseReason = "member was removed by an admin";
+String reason = options.reason() == null ? baseReason : baseReason + 
": " + options.reason();

Review comment:
   I think the `baseReason` can be declared as `static` variable.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -45,10 +46,21 @@ public RemoveMembersFromConsumerGroupOptions() {
 this.members = Collections.emptySet();
 }
 
+/**
+ * Sets an optional reason.
+ */
+public void reason(final String reason) {
+this.reason = reason;

Review comment:
   We have a `reason` setter and getter in the class. I think we should 
name them differently for other developer better distinguishing between them, 
instead of overloading them with different purpose. how about `setReason` and 
`reason`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3731,6 +3731,11 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 } else {
 members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
 }
+
+String baseReason = "member was removed by an admin";
+String reason = options.reason() == null ? baseReason : baseReason + 
": " + options.reason();

Review comment:
   And maybe rename to `baseLeaveGroupReason` or other names for much 
specific.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11522: KAFKA-13242: Ensure UpdateFeatures is properly handled in KRaft

2021-12-15 Thread GitBox


dengziming commented on pull request #11522:
URL: https://github.com/apache/kafka/pull/11522#issuecomment-995458447


   Thank you @mumrah , I am checking KIP-778 about this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13550) Jointly develop Kafka-upstream function support to fully integrate APISIX and Kafka

2021-12-15 Thread Wang Yeliang (Jira)
Wang Yeliang created KAFKA-13550:


 Summary: Jointly develop Kafka-upstream function support to fully 
integrate APISIX and Kafka
 Key: KAFKA-13550
 URL: https://issues.apache.org/jira/browse/KAFKA-13550
 Project: Kafka
  Issue Type: Improvement
Reporter: Wang Yeliang


Hi, community,
 
My name is Yeliang Wang, and I am Apache APISIX Committer.
 
long ago, Apache APISIX implemented the Kafka-logger plugin ( 
[https://apisix.apache.org/docs/apisix/plugins/kafka-logger/] ), can be used as 
NGX_ Kafka client driver for Lua nginx module, and many users are using this 
function.
 
Here, I would like to invite the Apache Kafka community to the Jointly develop 
Kafka-upstream function support to fully integrate APISIX and Kafka.
 
I believe in doing so, it can not only meet the diversified needs of users, but 
also enrich the surrounding ecology of Apache Kafka and Apache APISIX.
 
Wait for more discussion……



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-12-15 Thread Tarun Goswami (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460408#comment-17460408
 ] 

Tarun Goswami commented on KAFKA-9366:
--

What is the ETA for this issue resolution!

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on pull request #11604: MINOR: retry when deleting offsets for named topologies

2021-12-15 Thread GitBox


ableegoldman commented on pull request #11604:
URL: https://github.com/apache/kafka/pull/11604#issuecomment-995420047


   Hm, guess we should have seen this coming but it looks like this fix is 
significantly increasing the flakiness of 
`NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()`
 since retrying makes us more likely to hit  the `GroupIdNotFoundException: The 
group id does not exist.` type. So I guess we'll need to address them in the 
same PR after all. :/
   
   The good news is, I'm pretty sure I understand why this is happening, and 
_hopefully_ it will be a simple fix. I'm going to ping David in the core team 
slack to verify my theory and make sure we can do the "simple fix", I'll tag 
you as well for the fully story


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13538) Unexpected TopicExistsException related to Admin#createTopics after broker crash

2021-12-15 Thread joecqupt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17459974#comment-17459974
 ] 

joecqupt edited comment on KAFKA-13538 at 12/16/21, 3:43 AM:
-

it seems that it because KafkaAdminClient retry mechanism, you can set 
"retries" param 0 to avoid this problem, you will get a TimeoutException


was (Author: joecqupt):
it seems that it because KafkaAdminClient retry mechanism, you can unset 
"retries" param to avoid this problem, you will get a TimeoutException

> Unexpected TopicExistsException related to Admin#createTopics after broker 
> crash
> 
>
> Key: KAFKA-13538
> URL: https://issues.apache.org/jira/browse/KAFKA-13538
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Attachments: Screenshot from 2021-12-12 21-16-56.png, Screenshot from 
> 2021-12-12 21-17-04.png
>
>
> We were using the official Kafka Java API to create a topic in a Kafka broker 
> cluster (3 brokers):
> {code:java}
> CreateTopicsResult result = admin.createTopics(...);
> ... = result.all().get(); {code}
> The topic we create always has replication factor = 2, and partition = 2. If 
> one of the brokers crashes for some reason and the client tries to create a 
> topic exactly in this crashed broker, we usually observe that the client may 
> suffer from a delay of a few seconds due to the disconnection issue, and then 
> the client automatically connects to another broker and creates the topic in 
> this broker. Everything is done automatically in the client, under the code 
> of `admin.createTopics(...)` and `result.all().get()`.
> However, we found that sometimes we got `TopicExistsException` from 
> `result.all().get()`, but we had never created this topic beforehand.
> After some investigation on the source code of client, we found that this 
> issue happens in this way:
>  # The client connects to a broker (say, broker X) and then sends the topic 
> creation request.
>  # This topic has replication factor = 2 and partition = 2, so broker X may 
> inform another broker of this information.
>  # Broker X suddenly crashes for some reason, and the response for the topic 
> creation request has not been sent back to the client.
>  # The client eventually learns that broker X crashes, but never gets the 
> response for the topic creation request. Thus the client thinks the topic 
> creation request fails, and thus connects to another broker (say, broker Y) 
> and then sends the topic creation request again.
>  # This topic creation request (with replication factor = 2 and partition = 
> 2) had been partially executed before broker X crashes, so broker Y may have 
> done something required by broker X. For example, broker Y has some metadata 
> about this topic. Therefore, when Broker Y does some sanity check with the 
> metadata, it will find this topic exists, so broker Y directly returns 
> `TopicExistsException` as the response.
>  # The client receives `TopicExistsException`, and directly believes that 
> this topic has been created, so it is thrown back to the user with the API 
> `result.all().get()`.
> There are 2 diagrams illustrating these 6 steps:
> !Screenshot from 2021-12-12 21-16-56.png!
> !Screenshot from 2021-12-12 21-17-04.png!
> Now the core question is whether this workflow violates the semantic & design 
> of the Kafka Client API. We read the “Create Topics Response” section in 
> KIP-4 
> ([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]).
>  We found that the description in KIP-4 focuses on the batch request of topic 
> creations and how they work independently. It does not talk about how the 
> client should deal with the aforementioned buggy scenario.
> According to “common sense”, we think the client should be able to know that 
> the metadata existing in broker Y is actually created by the client via the 
> crashed broker X. Also, the client should not throw `TopicExistsException` to 
> the user.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on pull request #11589: MINOR: update log and method name

2021-12-15 Thread GitBox


ableegoldman commented on pull request #11589:
URL: https://github.com/apache/kafka/pull/11589#issuecomment-995412482


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11589: MINOR: update log and method name

2021-12-15 Thread GitBox


ableegoldman merged pull request #11589:
URL: https://github.com/apache/kafka/pull/11589


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11589: MINOR: update log and method name

2021-12-15 Thread GitBox


ableegoldman commented on pull request #11589:
URL: https://github.com/apache/kafka/pull/11589#issuecomment-995410900


   Test failures are unrelated:
   kafka.server.ReplicaManagerTest.[1] usesTopicIds=true 
   
kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe()
 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13537) Will kafka_2.12-2.3.0 version be impacted by new zero-day exploit going on since last friday?

2021-12-15 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460381#comment-17460381
 ] 

Dongjin Lee commented on KAFKA-13537:
-

[~rajnaik] [~showuon] Plus: for trogdor and tools module, please refer 
[KAFKA-12399|https://issues.apache.org/jira/browse/KAFKA-12399].

> Will kafka_2.12-2.3.0 version be impacted by new zero-day exploit going on 
> since last friday?
> -
>
> Key: KAFKA-13537
> URL: https://issues.apache.org/jira/browse/KAFKA-13537
> Project: Kafka
>  Issue Type: Bug
> Environment: All
>Reporter: Rajendra
>Priority: Major
>
> h3. new zero-day exploit has been reported against the popular Log4J2 library 
> which can allow an attacker to remotely execute code.
> h3. Affected Software
> A significant number of Java-based applications are using log4j as their 
> logging utility and are vulnerable to this CVE. To the best of our knowledge, 
> at least the following software may be impacted:
>  * Apache Struts
>  * Apache Solr
>  * Apache Druid
>  * Apache Flink
>  * ElasticSearch
>  * Flume
>  * Apache Dubbo
>  * Logstash
>  * Kafka
>  * Spring-Boot-starter-log4j2
> Wondering if kafka_2.12-2.3.0 is impacted. I see below libraries.
> kafka-log4j-appender-2.3.0.jar  log4j-1.2.17.jar  
> scala-logging_2.12-3.9.0.jar  slf4j-log4j12-1.7.26.jar
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13547) Kafka - 1.0.0 | Remove log4j.jar

2021-12-15 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee resolved KAFKA-13547.
-
Resolution: Duplicate

It seems like you are trying VerifiableProducer in tools, right? Removing log4j 
1.x from the tools module is working in progress in KAFKA-12399.

> Kafka - 1.0.0 | Remove log4j.jar
> 
>
> Key: KAFKA-13547
> URL: https://issues.apache.org/jira/browse/KAFKA-13547
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Blocker
>
> We wanted to remove the log4j.jar but ended up with a dependency on the 
> kafka.producer.ProducerConfig.
> Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/Logger
>     at kafka.utils.Logging.logger(Logging.scala:24)
>     at kafka.utils.Logging.logger$(Logging.scala:24)
>     at 
> kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperties.scala:27)
>     at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:27)
>     at kafka.utils.Logging.info(Logging.scala:71)
>     at kafka.utils.Logging.info$(Logging.scala:70)
>     at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:27)
>     at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:218)
>     at kafka.producer.ProducerConfig.(ProducerConfig.scala:61)
> Is there any configuration available which can resolve this error.
> Please note we are not using log4j.properties or any other log4j logging 
> mechanism for Kafka connection in the application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-12-15 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460377#comment-17460377
 ] 

Dongjin Lee commented on KAFKA-9366:


[~showuon] Sure. The PR is now updated to handle log4j2 2.16.0.

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-12256) auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION

2021-12-15 Thread Ryan Leslie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Leslie updated KAFKA-12256:

Summary: auto commit causes delays due to retriable 
UNKNOWN_TOPIC_OR_PARTITION  (was: auto commit causes delays due to retribale 
UNKNOWN_TOPIC_OR_PARTITION)

> auto commit causes delays due to retriable UNKNOWN_TOPIC_OR_PARTITION
> -
>
> Key: KAFKA-12256
> URL: https://issues.apache.org/jira/browse/KAFKA-12256
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.0.0
>Reporter: Ryan Leslie
>Priority: Minor
>
> In KAFKA-6829 a change was made to the consumer to internally retry commits 
> upon receiving UNKNOWN_TOPIC_OR_PARTITION.
> Though this helped mitigate issues around stale broker metadata, there were 
> some valid concerns around the negative effects for routine topic deletion:
> https://github.com/apache/kafka/pull/4948
> In particular, if a commit is issued for a deleted topic, retries can block 
> the consumer for up to max.poll.interval.ms. This is tunable of course, but 
> any amount of stalling in a consumer can lead to unnecessary lag.
> One of the assumptions while permitting the change was that in practice it 
> should be rare for commits to occur for deleted topics, since that would 
> imply messages were being read or published at the time of deletion. It's 
> fair to expect users to not delete topics that are actively published to. But 
> this assumption is false in cases where auto commit is enabled.
> With the current implementation of auto commit, the consumer will regularly 
> issue commits for all topics being fetched from, regardless of whether or not 
> messages were actually received. The fetch positions are simply flushed, even 
> when they are 0. This is simple and generally efficient, though it does mean 
> commits are often redundant. Besides the auto commit interval, commits are 
> also issued at the time of rebalance, which is often precisely at the time 
> topics are deleted.
> This means that in practice commits for deleted topics are not really rare. 
> This is particularly an issue when the consumer is subscribed to a multitude 
> of topics using a wildcard. For example, a consumer might subscribe to a 
> particular "flavor" of topic with the aim of auditing all such data, and 
> these topics might dynamically come and go. The consumer's metadata and 
> rebalance mechanisms are meant to handle this gracefully, but the end result 
> is that such groups are often blocked in a commit for several seconds or 
> minutes (the default is 5 minutes) whenever a delete occurs. This can 
> sometimes result in significant lag.
> Besides having users abandon auto commit in the face of topic deletes, there 
> are probably multiple ways to deal with this, including reconsidering if 
> commits still truly need to be retried here, or if this behavior should be 
> more configurable; e.g. having a separate commit timeout or policy. In some 
> cases the loss of a commit and subsequent message duplication is still 
> preferred to processing delays. And having an artificially low 
> max.poll.interval.ms or rebalance.timeout.ms comes with its own set of 
> concerns.
> In the very least the current behavior and pitfalls around delete with active 
> consumers should be documented.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-12-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460354#comment-17460354
 ] 

Luke Chen commented on KAFKA-13255:
---

Thanks for your response. In that case, I agree we don't need to include this 
fix in v3.1.0. Thank you.

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Assignee: Ed Berezitsky
>Priority: Major
> Fix For: 3.2.0
>
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-15 Thread GitBox


showuon commented on pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#issuecomment-995362529


   > I will take another look tomorrow. Sorry for the delay.
   
   No problem, David! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-12-15 Thread GitBox


vvcephei commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r770148349



##
File path: build.gradle
##
@@ -1205,6 +1205,7 @@ project(':clients') {
 
 testImplementation libs.bcpkix
 testImplementation libs.junitJupiter
+testImplementation libs.log4j

Review comment:
   Oh, right, I forgot about that. We did that on purpose after something 
bad got pulled in accidentally. I forget the exact details.
   
   Sorry for the wild goose chase.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-15 Thread GitBox


cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r77013



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -434,7 +433,7 @@ class BrokerServer(
 maybeChangeStatus(STARTING, STARTED)
 fatal("Fatal error during broker startup. Prepare to shutdown", e)
 shutdown()
-throw e
+throw if (e.isInstanceOf[ExecutionException]) e.getCause else e

Review comment:
   yes, this is a good change :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-15 Thread GitBox


cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770131044



##
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##
@@ -54,10 +55,11 @@ class ZooKeeperQuorumImplementation(val zookeeper: 
EmbeddedZookeeper,
 val zkClient: KafkaZkClient,
 val adminZkClient: AdminZkClient,
 val log: Logging) extends 
QuorumImplementation {
-  override def createAndStartBroker(config: KafkaConfig,
-time: Time): KafkaBroker = {
+  override def createAndMaybeStartBroker(config: KafkaConfig,

Review comment:
   Let's just call this `createBroker` and have the startup argument speak 
for itself...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-15 Thread GitBox


cmccabe commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770130266



##
File path: 
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
*/
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; 
override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = 
generateConfigs
+

Review comment:
   Isn't there a simpler way of doing this where subclasses who want the 
old configs just do `super.generateConfigs` and then modify what that function 
returns? I'm not convinced we need a new function or argument here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11589: MINOR: update log and method name

2021-12-15 Thread GitBox


showuon commented on a change in pull request #11589:
URL: https://github.com/apache/kafka/pull/11589#discussion_r770117284



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -153,7 +153,7 @@ public void maybeNotifyTopologyVersionWaiters(final String 
threadName) {
 if (threadVersions.values().stream().allMatch(t -> t >= 
topologyVersionWaitersVersion)) {
 topologyVersionWaiters.future.complete(null);
 iterator.remove();
-log.info("Thread {} is now on topology version {}", 
threadName, topologyVersionWaiters.topologyVersion);
+log.info("All threads are now on topology version {}", 
topologyVersionWaiters.topologyVersion);

Review comment:
   I see. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-15 Thread GitBox


rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770109618



##
File path: 
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##
@@ -67,6 +68,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
*/
   def generateConfigs: Seq[KafkaConfig]
 
+  /**
+   * It is sometimes useful to keep the same log.dirs configuration value; 
override this method if that is desired
+   *
+   * @param priorConfigs the prior configs
+   * @return the new generated configs
+   */
+  def regenerateConfigs(priorConfigs: Seq[KafkaConfig]): Seq[KafkaConfig] = 
generateConfigs
+

Review comment:
   The other option is to change the existing `generateConfigs()` method to 
take a `priorConfigs: Option[Seq[KafkaConfig]]` argument.  This is a more 
intrusive change as it would require 30+ changes elsewhere, but I am not averse 
to doing it if that is preferred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-15 Thread GitBox


rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r770108761



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -434,7 +433,7 @@ class BrokerServer(
 maybeChangeStatus(STARTING, STARTED)
 fatal("Fatal error during broker startup. Prepare to shutdown", e)
 shutdown()
-throw e
+throw if (e.isInstanceOf[ExecutionException]) e.getCause else e

Review comment:
   This causes the KRaft broker to throw KafkaStorageException like the ZK 
broker rather than ExecutionException, which is what is thrown now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #11606: MINOR: Add shutdown tests for KRaft

2021-12-15 Thread GitBox


rondagostino opened a new pull request #11606:
URL: https://github.com/apache/kafka/pull/11606


   Augments existing shutdown tests for KRaft.  Adds the ability to update 
configs in KRaft tests, and in both the ZK and KRaft cases to be able to update 
configs without losing the server's log directory and data.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-15 Thread GitBox


mjsax commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r770076908



##
File path: streams/src/main/java/org/apache/kafka/streams/query/QueryResult.java
##
@@ -197,6 +197,18 @@ public R getResult() {
 return result;
 }
 
+@SuppressWarnings("unchecked")
+public  QueryResult swapResult(final V value) {
+if (isFailure()) {
+return (QueryResult) this;

Review comment:
   Why do we know allow to swap the result if the current result has a 
failure? And if we don't want to allow swapping, why just return `this` but not 
throw an exception?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {

Review comment:
   Does it make sense to add execution info for this case? We pushed the 
query down, but did not handle it and thus the inner store would track (and we 
should not track)?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");

Review comment:
   nit: `in` -> `within`
   (also: maybe add `.` at the end?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,68 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + (System.nanoTime() - 
start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - start) + 
"ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runKeyQuery(final Query query,
+final PositionBound positionBound, final boolean collectExecutionInfo) 
{
+final QueryResult result;
+final KeyQuery typedQuery = (KeyQuery) query;
+final KeyQuery rawKeyQuery = 
KeyQuery.withKey(keyBytes(typedQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, collectExecutionInfo);
+if (rawResult.isSuccess()) {
+final boolean timestamped = 

[GitHub] [kafka] yyu1993 opened a new pull request #11605: MINOR: replace lastOption call in LocalLog#flush() to prevent NoSuchElementException

2021-12-15 Thread GitBox


yyu1993 opened a new pull request #11605:
URL: https://github.com/apache/kafka/pull/11605


   This PR fixed the iterator access issue during segment flush where a 
NoSuchElementException may be thrown. This can particularly happen if the 
Iterable[LogSegment] is accessed by 2 threads in parallel. A particular case is 
when retention periodic work empties segments from the segments map, while 
concurrently the async flush operation tries to access the very last segment 
from the same map. Since Iterable[T].lastOption is not thread-safe, the 
parallel access sometimes fails with a NoSuchElementException.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvp-8 commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2021-12-15 Thread GitBox


dhruvp-8 commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-995250657


   > Hi All,
   > 
   > Sorry for being late. Here is the update! This PR is rebased onto the 
latest trunk and now uses log4j2 2.16.0 to address 
[CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and 
[CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
   > 
   > According to [this 
analysis](https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126),
 log4j 1.x is [affected by this 
vulnerability](https://nvd.nist.gov/vuln/detail/CVE-2021-4104) but, only when 
it uses JMS appender. So, unless you are using the JMS appender, you are safe 
from this vulnerability.
   
   @dongjinleekr It would be great if you can provide this change as part of 
your preview releases for 2.7.1 and 2.8.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11604: MINOR: retry when deleting offsets for named topologies

2021-12-15 Thread GitBox


wcarlson5 commented on pull request #11604:
URL: https://github.com/apache/kafka/pull/11604#issuecomment-995213697


   @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #11604: MINOR: retry when deleting offsets for named topologies

2021-12-15 Thread GitBox


wcarlson5 opened a new pull request #11604:
URL: https://github.com/apache/kafka/pull/11604


   When this was made I didn't expect deleteOffsetsResult to be set if an 
exception was thrown. But it is and to retry we need to reset it to null.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-12-15 Thread GitBox


C0urante commented on pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#issuecomment-995213197


   @rhauch gentle reminder, it's been another month and we're still waiting on 
that next pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-12-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r770031575



##
File path: build.gradle
##
@@ -1205,6 +1205,7 @@ project(':clients') {
 
 testImplementation libs.bcpkix
 testImplementation libs.junitJupiter
+testImplementation libs.log4j

Review comment:
   Jenkins has indeed tested for us and the verdict appears to be that this 
dependency is still required. I that the way that we're pulling in the clients 
test library prevents transitive dependencies from being included. We might 
consider changing `testImplementation 
project(':clients').sourceSets.test.output` to just `testImplementation 
project(':clients')` but it looks like all non-core projects use that style; 
probably best to leave that as-is for now and add the log4j dependency back.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769918971



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {

Review comment:
   Agreed that extending `BaseProducerSendTest` would run unnecessary 
tests. Changed now to extend from `IntegrationTestHarness` and verified that 
the test fails w/o the fix and passes after the fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769919454



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1
+zkClient.createPartitionReassignment(reassignment)
+TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+"failed to remove reassign partitions path after completion")
+
+val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
deliveryTimeoutMs = 20 * 1000)
+
+(1 to numRecords).map { i =>

Review comment:
   done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769918057



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo

Review comment:
   agreed, modified accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769917809



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());

Review comment:
   ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769917597



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -394,8 +394,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 if (hasReliableLeaderEpoch && 
partitionMetadata.leaderEpoch.isPresent()) {
 int newEpoch = partitionMetadata.leaderEpoch.get();
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (topicId != null && oldTopicId != null && 
!topicId.equals(oldTopicId)) {
-// If both topic IDs were valid and the topic ID changed, 
update the metadata
+// Between the time that a topic is deleted and re-created, the 
client may lose
+// track of the corresponding topicId (i.e. `oldTopicId` will be 
null). In this case,
+// when we discover the new topicId, we allow the corresponding 
leader epoch
+// to override the last seen value.

Review comment:
   moved as per the suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769903291



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+

Review comment:
   removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769902172



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Topic A is now deleted so Response contains an Error. LeaderEpoch 
should still return maintain Old value

Review comment:
   removed.

##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Topic A is now deleted so Response contains an Error. LeaderEpoch 
should still return maintain Old value
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new 
HashMap<>());

Review comment:
   ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769901576



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+

Review comment:
   removed.

##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1
+zkClient.createPartitionReassignment(reassignment)
+TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+"failed to remove reassign partitions path after completion")
+
+val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
deliveryTimeoutMs = 20 * 1000)
+
+(1 to numRecords).map { i =>
+val resp = producer.send(new ProducerRecord(topic, null, ("value" 
+ i).getBytes(StandardCharsets.UTF_8))).get
+assertEquals(topic, resp.topic())
+}
+
+// start topic deletion
+adminZkClient.deleteTopic(topic)
+
+// Verify that the topic is deleted when no metadata request comes in
+TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+// Producer should be able to send messages even after topic gets 
deleted and auto-created
+assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
("value").getBytes(StandardCharsets.UTF_8))).get.topic())

Review comment:
   ack




-- 
This is an automated message from the 

[GitHub] [kafka] prat0318 commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


prat0318 commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769901214



##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.
+createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+val reassignment = Map(
+new TopicPartition(topic, 0) -> Seq(1, 0),
+new TopicPartition(topic, 1) -> Seq(1, 0)
+)
+
+// Change leader to 1 for both the partitions to increase leader Epoch 
from 0 -> 1
+zkClient.createPartitionReassignment(reassignment)
+TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+"failed to remove reassign partitions path after completion")
+
+val producer = createProducer(brokerList, maxBlockMs = 5 * 1000L, 
deliveryTimeoutMs = 20 * 1000)
+
+(1 to numRecords).map { i =>
+val resp = producer.send(new ProducerRecord(topic, null, ("value" 
+ i).getBytes(StandardCharsets.UTF_8))).get
+assertEquals(topic, resp.topic())
+}
+
+// start topic deletion
+adminZkClient.deleteTopic(topic)
+
+// Verify that the topic is deleted when no metadata request comes in
+TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+// Producer should be able to send messages even after topic gets 
deleted and auto-created
+assertEquals(topic, producer.send(new ProducerRecord(topic, null, 
("value").getBytes(StandardCharsets.UTF_8))).get.topic())
+}
+
+

Review comment:
   removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-12-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r769900920



##
File path: build.gradle
##
@@ -1205,6 +1205,7 @@ project(':clients') {
 
 testImplementation libs.bcpkix
 testImplementation libs.junitJupiter
+testImplementation libs.log4j

Review comment:
   Ack, will give it a shot. Haven't tested locally, hoping Jenkins can do 
that for us.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13549) Add "delete interval" config

2021-12-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13549:
---

 Summary: Add "delete interval" config
 Key: KAFKA-13549
 URL: https://issues.apache.org/jira/browse/KAFKA-13549
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams uses "delete record" requests to aggressively purge data from 
repartition topics. Those request are sent each time we commit.

For at-least-once with a default commit interval of 30 seconds, this works 
fine. However, for exactly-once with a default commit interval of 100ms, it's 
very aggressive. The main issue is broker side, because the broker logs every 
"delete record" request, and thus broker logs are spammed if EOS is enabled.

We should consider to add a new config (eg `delete.record.interval.ms` or 
similar) to have a dedicated config for "delete record" requests, to decouple 
it from the commit interval config and allow to purge data less aggressively, 
even if the commit interval is small to avoid the broker side log spamming.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-12-15 Thread Ed Berezitsky (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460163#comment-17460163
 ] 

Ed Berezitsky commented on KAFKA-13255:
---

the workaround would be to pre-create topics on a target cluster. Also, this 
bug is critical (show stopper) only if MM2 configured to replicate topics 
cross-flavors cluster, as reported in the original case above (Confluent to 
MSK, or to vanilla Apache Kafka). Once topic is created on target, this bug 
won't affect data mirroring process.

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Assignee: Ed Berezitsky
>Priority: Major
> Fix For: 3.2.0
>
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-12-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r769859525



##
File path: checkstyle/import-control.xml
##
@@ -531,6 +529,7 @@
   
   
   
+  

Review comment:
   Yep, good call, thanks for catching this.
   
   I could be wrong but I think the import control logic requires everything to 
be explicitly allowed; don't think there's an explicit restriction on the 
Streams package name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-12-15 Thread GitBox


C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r769858234



##
File path: build.gradle
##
@@ -2400,6 +2401,7 @@ project(':connect:runtime') {
 testImplementation project(':core')
 testImplementation project(':metadata')
 testImplementation project(':core').sourceSets.test.output
+testImplementation project(':streams').sourceSets.test.output

Review comment:
   Good catch, do not need this!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-12-15 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460107#comment-17460107
 ] 

Mickael Maison commented on KAFKA-13255:


It's not a regression from 3.0 so it's probably too late for 3.1.0. David Jacot 
is the release manager and has the final say. You can ask him if you think this 
should be included.
I'm not sure if there's a workaround apart from not mirroring topics with 
custom configurations


> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Assignee: Ed Berezitsky
>Priority: Major
> Fix For: 3.2.0
>
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] tombentley commented on a change in pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-15 Thread GitBox


tombentley commented on a change in pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#discussion_r769814004



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -203,23 +207,22 @@ class SocketServer(val config: KafkaConfig,
* Before starting them, we ensure that authorizer has all the metadata to 
authorize
* requests on that endpoint by waiting on the provided future.
*/
-  private def startAcceptorAndProcessors(threadPrefix: String,
- endpoint: EndPoint,
+  private def startAcceptorAndProcessors(endpoint: EndPoint,

Review comment:
   If `acceptor` has `endpoint` as a member, why do we need to pass it 
explicitly here? 

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -95,20 +95,22 @@ class SocketServer(val config: KafkaConfig,
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
   // control-plane
-  private var controlPlaneProcessorOpt : Option[Processor] = None
-  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
+  private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = 
None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = 
config.controlPlaneListenerName.map(_ =>
-new RequestChannel(20, ControlPlaneMetricPrefix, time, 
apiVersionManager.newRequestMetrics))
+new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, 
apiVersionManager.newRequestMetrics))
 
-  private var nextProcessorId = 0
+  private val nextPId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
   private var startedProcessingRequests = false
   private var stoppedProcessingRequests = false
 
+  def nextProcessorId(): Int = {
+nextPId.getAndIncrement()

Review comment:
   `nextPId` is a bit hard on the eyes. `_nextProcessorId` perhaps? Also 
perhaps a comment that this it only needed for the legacy metric names. 

##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -920,7 +919,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
   def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
Map[String, AnyRef] = {
 kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case 
(key, _) =>
   // skip the reconfigurable configs
-  !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key)
+  !DynamicSecurityConfigs.contains(key) && 
!SocketServer.ListenerReconfigurableConfigs.contains(key) && 
!DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)

Review comment:
   IIUC the reconfigurability of thread numbers is not specific to the data 
plane, so why is the constant declared on `DataPlaneAcceptor` rather than 
`Acceptor`?

##
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##
@@ -82,6 +82,11 @@ class SocketServerTest {
 
   private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
   private var logLevelToRestore: Level = _
+  def endpoint: EndPoint = {
+KafkaConfig.fromProps(props, doLog = false).dataPlaneListeners.head
+  }
+  def listener: String = endpoint.listenerName.value
+  @volatile var uncaughtExceptions = 0

Review comment:
   `uncaughtExceptions` gets incremented with += 1, which would require 
atomic CAS rather than `volatile`. 

##
File path: 
core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
##
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable 

[jira] [Commented] (KAFKA-8495) Make Round-robin / RangeAssignor to be "sticky" (part 5)

2021-12-15 Thread Pierre-Yves Bigourdan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460099#comment-17460099
 ] 

Pierre-Yves Bigourdan commented on KAFKA-8495:
--

[~ableegoldman] could you please clarify "round-robin/range assignors will 
deliberately not support cooperative rebalancing", or point to some relevant 
discussions providing more context?

I'm currently working on a project that would benefit from moving to a 
cooperative protocol to reduce the performance hit incurred from rebalances, 
but unfortunately the way the sticky algorithm assigns partitions to consumers 
isn't suitable for my use case.

> Make Round-robin / RangeAssignor to be "sticky" (part 5)
> 
>
> Key: KAFKA-8495
> URL: https://issues.apache.org/jira/browse/KAFKA-8495
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> For this new algorithm to be effective in reducing rebalance costs, it is 
> really expecting the plug-in assignor to be "sticky" in some way, such that 
> the diff of the newly-assigned-partitions and the 
> existing-assigned-partitions can be small, and hence only a few subset of the 
> total number of partitions need to be revoked / migrated at each rebalance in 
> practice – otherwise, we are just paying more rebalance for little benefits.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-8495) Make Round-robin / RangeAssignor to be "sticky" (part 5)

2021-12-15 Thread Pierre-Yves Bigourdan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460099#comment-17460099
 ] 

Pierre-Yves Bigourdan edited comment on KAFKA-8495 at 12/15/21, 4:58 PM:
-

[~ableegoldman] could you please clarify "round-robin/range assignors will 
deliberately not support cooperative rebalancing", or point to some relevant 
discussions providing more context?

I'm currently working on a project that would benefit from moving to a 
cooperative protocol to reduce the performance hit incurred by rebalances, but 
unfortunately the way the sticky algorithm assigns partitions to consumers 
isn't suitable for my use case.


was (Author: pyves):
[~ableegoldman] could you please clarify "round-robin/range assignors will 
deliberately not support cooperative rebalancing", or point to some relevant 
discussions providing more context?

I'm currently working on a project that would benefit from moving to a 
cooperative protocol to reduce the performance hit incurred from rebalances, 
but unfortunately the way the sticky algorithm assigns partitions to consumers 
isn't suitable for my use case.

> Make Round-robin / RangeAssignor to be "sticky" (part 5)
> 
>
> Key: KAFKA-8495
> URL: https://issues.apache.org/jira/browse/KAFKA-8495
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> For this new algorithm to be effective in reducing rebalance costs, it is 
> really expecting the plug-in assignor to be "sticky" in some way, such that 
> the diff of the newly-assigned-partitions and the 
> existing-assigned-partitions can be small, and hence only a few subset of the 
> total number of partitions need to be revoked / migrated at each rebalance in 
> practice – otherwise, we are just paying more rebalance for little benefits.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery

2021-12-15 Thread John Roesler (Jira)
John Roesler created KAFKA-13548:


 Summary: IQv2: revisit WindowKeyQuery and WindowRangeQuery
 Key: KAFKA-13548
 URL: https://issues.apache.org/jira/browse/KAFKA-13548
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler


During discussion of KIP-806, there was a suggestion to refactor the queries 
following a builder pattern so that we can compactly and flexibly specify lower 
and upper bounds on the keys, window start times, and window end times.

We should circle back and try to generalize the queries' interfaces before the 
first release of IQv2.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on pull request #11451: KAFKA-13419: Only reset generation ID when ILLEGAL_GENERATION error

2021-12-15 Thread GitBox


dajac commented on pull request #11451:
URL: https://github.com/apache/kafka/pull/11451#issuecomment-994956979


   I will take another look tomorrow. Sorry for the delay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-15 Thread GitBox


dajac commented on a change in pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#discussion_r769761183



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+// Topic A is now deleted so Response contains an Error. LeaderEpoch 
should still return maintain Old value
+metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1, 
Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION), new 
HashMap<>());

Review comment:
   nit: Could we use `Collections.emptyMap()`?

##
File path: 
core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
##
@@ -0,0 +1,86 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.ProducerRecord
+
+import java.util.Properties
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends BaseProducerSendTest {
+
+
+override def generateConfigs = {
+val overridingProps = new Properties()
+val numServers = 2
+overridingProps.put(KafkaConfig.NumPartitionsProp, 2.toString)
+overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
2.toString)
+overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, 
false.toString)
+TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
interBrokerSecurityProtocol = Some(securityProtocol),
+trustStoreFile = trustStoreFile, saslProperties = 
serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
+}
+
+/**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way 
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in 
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+@Test
+def testSendWithTopicDeletionMidWay(): Unit = {
+val numRecords = 10
+
+// create topic with leader as 0 for the 2 partitions.

Review comment:
   nit: Could we start all comment with a capital letter to be consistent?

##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -372,6 +372,31 @@ public void testUpdateLastEpoch() {
 assertOptional(metadata.lastSeenLeaderEpoch(tp), leaderAndEpoch -> 
assertEquals(leaderAndEpoch.intValue(), 12));
 }
 
+@Test
+public void testEpochUpdateAfterTopicDeletion() {
+TopicPartition tp = new TopicPartition("topic-1", 0);
+Map topicIds = Collections.singletonMap("topic-1", 
Uuid.randomUuid());
+
+MetadataResponse metadataResponse = emptyMetadataResponse();
+metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+// Start with a topic A with a topic ID foo
+metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, 
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10, 
topicIds);
+

[GitHub] [kafka] mumrah commented on pull request #11522: KAFKA-13242: Ensure UpdateFeatures is properly handled in KRaft

2021-12-15 Thread GitBox


mumrah commented on pull request #11522:
URL: https://github.com/apache/kafka/pull/11522#issuecomment-994914027


   @dengziming, we have a few changes to the feature flags coming in KIP-778. 
Let's hold off on merging this until the KIP is adopted. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2021-12-15 Thread GitBox


dajac commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r769741518



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3731,6 +3731,11 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 } else {
 members = 
options.members().stream().map(MemberToRemove::toMemberIdentity).collect(Collectors.toList());
 }
+
+if (options.reason() != null) {
+members.forEach(member -> member.setReason(options.reason()));

Review comment:
   I think the KIP also specifies a default reason in this case, doesn't it?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -647,7 +647,7 @@ class GroupCoordinator(val brokerId: Int,
   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
 groupInstanceId.flatMap(group.currentStaticMemberId) match 
{
   case Some(currentMemberId) =>
-removeCurrentMemberFromGroup(group, currentMemberId)
+removeCurrentMemberFromGroup(group, currentMemberId, 
leavingMember.reason())

Review comment:
   nit: Parenthesis could be omitted.

##
File path: clients/src/main/resources/common/message/LeaveGroupRequest.json
##
@@ -36,7 +38,10 @@
 "about": "The member ID to remove from the group." },
   { "name": "GroupInstanceId", "type": "string",
 "versions": "3+", "nullableVersions": "3+", "default": "null",
-"about": "The group instance ID to remove from the group." }
+"about": "The group instance ID to remove from the group." },

Review comment:
   Could we extend tests in `RequestResponseTest` to cover this new field?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -617,9 +617,9 @@ class GroupCoordinator(val brokerId: Int,
leavingMembers: List[MemberIdentity],
responseCallback: LeaveGroupResult => Unit): Unit = {
 
-def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String): 
Unit = {
+def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String, 
leaveReason: String): Unit = {

Review comment:
   Should we use `Option[String]` for the `leaveReason` and handle the case 
where it is not provided?

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -665,7 +665,7 @@ class GroupCoordinator(val brokerId: Int,
   groupInstanceId,
   operation = "leave-group"
 ).getOrElse {
-  removeCurrentMemberFromGroup(group, memberId)
+  removeCurrentMemberFromGroup(group, memberId, 
leavingMember.reason())

Review comment:
   ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-12-15 Thread GitBox


vvcephei commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r751376161



##
File path: build.gradle
##
@@ -2400,6 +2401,7 @@ project(':connect:runtime') {
 testImplementation project(':core')
 testImplementation project(':metadata')
 testImplementation project(':core').sourceSets.test.output
+testImplementation project(':streams').sourceSets.test.output

Review comment:
   Since you've moved the util, do you still need this dependency?

##
File path: build.gradle
##
@@ -1205,6 +1205,7 @@ project(':clients') {
 
 testImplementation libs.bcpkix
 testImplementation libs.junitJupiter
+testImplementation libs.log4j

Review comment:
   I wonder if we can drop this dependency from Streams now. Do you mind 
checking?

##
File path: checkstyle/import-control.xml
##
@@ -531,6 +529,7 @@
   
   
   
+  

Review comment:
   Interesting. We had an import restriction on pulling in a Streams 
dependency?
   
   Actually, it looks like the util isn't in this package anymore at all, so 
this is probably not needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11589: MINOR: update log and method name

2021-12-15 Thread GitBox


wcarlson5 commented on a change in pull request #11589:
URL: https://github.com/apache/kafka/pull/11589#discussion_r769705381



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -153,7 +153,7 @@ public void maybeNotifyTopologyVersionWaiters(final String 
threadName) {
 if (threadVersions.values().stream().allMatch(t -> t >= 
topologyVersionWaitersVersion)) {
 topologyVersionWaiters.future.complete(null);
 iterator.remove();
-log.info("Thread {} is now on topology version {}", 
threadName, topologyVersionWaiters.topologyVersion);
+log.info("All threads are now on topology version {}", 
topologyVersionWaiters.topologyVersion);

Review comment:
   this log line only prints once not once per thread. each waiter also has 
a different version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2021-12-15 Thread GitBox


dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r769662220



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -2300,21 +2300,32 @@ public ConsumerGroupMetadata groupMetadata() {
  * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 
ConsumerPartitionAssignor}, you should not
  * use this API.
  *
+ * @param reason The reason why the new rebalance is needed.
+ *
  * @throws java.lang.IllegalStateException if the consumer does not use 
group subscription
  */
 @Override
-public void enforceRebalance() {
+public void enforceRebalance(final String reason) {
 acquireAndEnsureOpen();
 try {
 if (coordinator == null) {
 throw new IllegalStateException("Tried to force a rebalance 
but consumer does not have a group.");
 }
-coordinator.requestRejoin("rebalance enforced by user");
+String defaultReason = "rebalance enforced by user";

Review comment:
   nit: Should we define this one as a constant?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = exception.getMessage();

Review comment:
   I wonder if we should put a bit more information here. For instance, we 
could say `rebalance failed due to %s error: %s` where the first `%s` would be 
the exception's class and the second the message. What do you think?

##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -54,6 +56,9 @@
 "about": "The protocol name." },
   { "name": "Metadata", "type": "bytes", "versions": "0+",
 "about": "The protocol metadata." }
-]}
+]},
+{ "name": "Reason", "type": "string",
+  "versions": "8+", "nullableVersions": "8+","default": "null",

Review comment:
   Could we also update `RequestResponseTest` to cover this change?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -541,8 +579,17 @@ private void expectDisconnectInJoinGroup(
 }, null, true);
 }
 
+private void expectJoinGroup(
+String expectedMemberId,
+int responseGeneration,
+String responseMemberId

Review comment:
   nit: Indentation is off.

##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -30,7 +30,9 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is the same as version 6.
-  "validVersions": "0-7",
+  //
+  // Version 8 adds the Reason field (KIP-800).
+  "validVersions": "0-8",

Review comment:
   We need to bump the request version as well.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -163,6 +163,7 @@ class GroupCoordinator(val brokerId: Int,
   protocolType: String,
   protocols: List[(String, Array[Byte])],
   responseCallback: JoinCallback,
+  reason: String = null,

Review comment:
   I would rather use `Option[String]` here.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -133,6 +133,7 @@ public boolean hasNotJoinedGroup() {
 protected final ConsumerNetworkClient client;
 
 private Node coordinator = null;
+private String rejoinReason = "initialized abstract coordinator";

Review comment:
   The reason is a bit weird. I wonder if we could just leave it empty in 
the beginning.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -181,6 +182,8 @@ class GroupCoordinator(val brokerId: Int,
   responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
 case Some(group) =>
   group.inLock {
+if (reason != null)
+  info(s"memberId=$memberId with groupInstanceId=$groupInstanceId 
is attempting to join groupId=$groupId due to: $reason")

Review comment:
   We already log a few messages when a member (re-)joins. I wonder if we 
should pass the reason down and use it in all the existing messages that we 
already have. What do you think? I need to look at this a bit more myself.

##
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##
@@ -54,6 +56,9 @@
 "about": "The protocol name." },
   { "name": "Metadata", "type": "bytes", "versions": "0+",
 "about": "The protocol metadata." }
-]}
+]},
+{ "name": "Reason", 

[GitHub] [kafka] rajinisivaram merged pull request #11602: MINOR: Reset java.security.auth.login.config in ZK-tests to avoid config reload affecting subsequent tests

2021-12-15 Thread GitBox


rajinisivaram merged pull request #11602:
URL: https://github.com/apache/kafka/pull/11602


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #11602: MINOR: Reset java.security.auth.login.config in ZK-tests to avoid config reload affecting subsequent tests

2021-12-15 Thread GitBox


rajinisivaram commented on pull request #11602:
URL: https://github.com/apache/kafka/pull/11602#issuecomment-994844394


   @omkreddy Thanks for the review, test failures not related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11595: MINOR: timeout waitForBlock in connect BlockingConnectorTest

2021-12-15 Thread GitBox


mimaison merged pull request #11595:
URL: https://github.com/apache/kafka/pull/11595


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11595: MINOR: timeout waitForBlock in connect BlockingConnectorTest

2021-12-15 Thread GitBox


mimaison commented on pull request #11595:
URL: https://github.com/apache/kafka/pull/11595#issuecomment-994839317


   Tests passed locally, merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13538) Unexpected TopicExistsException related to Admin#createTopics after broker crash

2021-12-15 Thread joecqupt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17459974#comment-17459974
 ] 

joecqupt commented on KAFKA-13538:
--

it seems that it because KafkaAdminClient retry mechanism, you can unset 
"retries" param to avoid this problem, you will get a TimeoutException

> Unexpected TopicExistsException related to Admin#createTopics after broker 
> crash
> 
>
> Key: KAFKA-13538
> URL: https://issues.apache.org/jira/browse/KAFKA-13538
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.8.0
>Reporter: Haoze Wu
>Priority: Major
> Attachments: Screenshot from 2021-12-12 21-16-56.png, Screenshot from 
> 2021-12-12 21-17-04.png
>
>
> We were using the official Kafka Java API to create a topic in a Kafka broker 
> cluster (3 brokers):
> {code:java}
> CreateTopicsResult result = admin.createTopics(...);
> ... = result.all().get(); {code}
> The topic we create always has replication factor = 2, and partition = 2. If 
> one of the brokers crashes for some reason and the client tries to create a 
> topic exactly in this crashed broker, we usually observe that the client may 
> suffer from a delay of a few seconds due to the disconnection issue, and then 
> the client automatically connects to another broker and creates the topic in 
> this broker. Everything is done automatically in the client, under the code 
> of `admin.createTopics(...)` and `result.all().get()`.
> However, we found that sometimes we got `TopicExistsException` from 
> `result.all().get()`, but we had never created this topic beforehand.
> After some investigation on the source code of client, we found that this 
> issue happens in this way:
>  # The client connects to a broker (say, broker X) and then sends the topic 
> creation request.
>  # This topic has replication factor = 2 and partition = 2, so broker X may 
> inform another broker of this information.
>  # Broker X suddenly crashes for some reason, and the response for the topic 
> creation request has not been sent back to the client.
>  # The client eventually learns that broker X crashes, but never gets the 
> response for the topic creation request. Thus the client thinks the topic 
> creation request fails, and thus connects to another broker (say, broker Y) 
> and then sends the topic creation request again.
>  # This topic creation request (with replication factor = 2 and partition = 
> 2) had been partially executed before broker X crashes, so broker Y may have 
> done something required by broker X. For example, broker Y has some metadata 
> about this topic. Therefore, when Broker Y does some sanity check with the 
> metadata, it will find this topic exists, so broker Y directly returns 
> `TopicExistsException` as the response.
>  # The client receives `TopicExistsException`, and directly believes that 
> this topic has been created, so it is thrown back to the user with the API 
> `result.all().get()`.
> There are 2 diagrams illustrating these 6 steps:
> !Screenshot from 2021-12-12 21-16-56.png!
> !Screenshot from 2021-12-12 21-17-04.png!
> Now the core question is whether this workflow violates the semantic & design 
> of the Kafka Client API. We read the “Create Topics Response” section in 
> KIP-4 
> ([https://cwiki.apache.org/confluence/display/kafka/kip-4+-+command+line+and+centralized+administrative+operations]).
>  We found that the description in KIP-4 focuses on the batch request of topic 
> creations and how they work independently. It does not talk about how the 
> client should deal with the aforementioned buggy scenario.
> According to “common sense”, we think the client should be able to know that 
> the metadata existing in broker Y is actually created by the client via the 
> crashed broker X. Also, the client should not throw `TopicExistsException` to 
> the user.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13255) Mirrormaker config property config.properties.exclude is not working as expected

2021-12-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17459930#comment-17459930
 ] 

Luke Chen commented on KAFKA-13255:
---

[~mimaison] [~bdesert] , do you know if there's any workaround for this issue? 
If no, do you think we should get this fix cherry-picked back to v3.1.0, since 
it blocked users using MM2?

Thank you.

> Mirrormaker config property config.properties.exclude is not working as 
> expected 
> -
>
> Key: KAFKA-13255
> URL: https://issues.apache.org/jira/browse/KAFKA-13255
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.8.0
>Reporter: Anamika Nadkarni
>Assignee: Ed Berezitsky
>Priority: Major
> Fix For: 3.2.0
>
>
> Objective - Use MM2 (kafka connect in distributed cluster) for data migration 
> between cluster hosted in private data center and aws msk cluster.
> Steps performed -
>  # Started kafka-connect service.
>  # Created 3 MM2 connectors (i.e. source connector, checkpoint connector and 
> heartbeat connector). Curl commands used to create connectors are in the 
> attached file.  To exclude certain config properties while topic replication, 
> we are using the 'config.properties.exclude' property in the MM2 source 
> connector.
> Expected -
> Source topic 'dev.portlandDc.anamika.helloMsk' should be successfully created 
> in destination cluster.
> Actual -
> Creation of the source topic 'dev.portlandDc.anamika.helloMsk' in destination 
> cluster fails with an error. Error is
> {code:java}
> [2021-08-06 06:13:40,944] WARN [mm2-msc|worker] Could not create topic 
> dev.portlandDc.anamika.helloMsk. 
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:371)
> org.apache.kafka.common.errors.InvalidConfigurationException: Unknown topic 
> config name: confluent.value.schema.validation{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13545) Workaround for mitigating CVE-2021-4104 Kafka

2021-12-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17459904#comment-17459904
 ] 

Luke Chen commented on KAFKA-13545:
---

Close as "Not A Bug". Thanks.

> Workaround for mitigating CVE-2021-4104 Kafka 
> --
>
> Key: KAFKA-13545
> URL: https://issues.apache.org/jira/browse/KAFKA-13545
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Akansh Shandilya
>Priority: Major
>
> A new vulnerability is published today :
> https://nvd.nist.gov/vuln/detail/CVE-2021-4104
>  
> Kafka v2.8.1 uses log4j v1.x . Please review following information :
> Is Kafka v2.8.1 impacted by  CVE-2021-4104?
> If yes, is there any workaround/recommendation available for Kafka  v2.8.1 to 
> mitigate CVE-2021-4104



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13545) Workaround for mitigating CVE-2021-4104 Kafka

2021-12-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13545.
---
Resolution: Not A Bug

> Workaround for mitigating CVE-2021-4104 Kafka 
> --
>
> Key: KAFKA-13545
> URL: https://issues.apache.org/jira/browse/KAFKA-13545
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Akansh Shandilya
>Priority: Major
>
> A new vulnerability is published today :
> https://nvd.nist.gov/vuln/detail/CVE-2021-4104
>  
> Kafka v2.8.1 uses log4j v1.x . Please review following information :
> Is Kafka v2.8.1 impacted by  CVE-2021-4104?
> If yes, is there any workaround/recommendation available for Kafka  v2.8.1 to 
> mitigate CVE-2021-4104



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13545) Workaround for mitigating CVE-2021-4104 Kafka

2021-12-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reopened KAFKA-13545:
---

> Workaround for mitigating CVE-2021-4104 Kafka 
> --
>
> Key: KAFKA-13545
> URL: https://issues.apache.org/jira/browse/KAFKA-13545
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Akansh Shandilya
>Priority: Major
>
> A new vulnerability is published today :
> https://nvd.nist.gov/vuln/detail/CVE-2021-4104
>  
> Kafka v2.8.1 uses log4j v1.x . Please review following information :
> Is Kafka v2.8.1 impacted by  CVE-2021-4104?
> If yes, is there any workaround/recommendation available for Kafka  v2.8.1 to 
> mitigate CVE-2021-4104



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2021-12-15 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-994740102


   Hi All,
   
   Sorry for being late. Here is the update! This PR is rebased onto the latest 
trunk and now uses log4j2 2.16.0 to address 
[CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and 
[CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
   
   According to [this 
analysis](https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126),
 log4j 1.x is [affected by this 
vulnerability](https://nvd.nist.gov/vuln/detail/CVE-2021-4104) but, only when 
it uses JMS appender. So, unless you are using the JMS appender, you are safe 
from this vulnerability.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org