[GitHub] [kafka] satishd commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-27 Thread via GitHub


satishd commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1118411232


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1112,7 +1079,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
 new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, 
maxTimestamp, offsetOfMaxTimestamp,
   RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, 
sourceCompression, targetCompression,
-  shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
+  shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch, 
Collections.emptyList[RecordError], null,
+  LeaderHwChange.NONE)

Review Comment:
   Earlier, we had default values in Scala. These changes did not introduce 
constructor overloading for different combinations as the usages were low.  It 
is simpler to have a constructor and set this value from a couple of places. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13311: MINOR: Enable spotless for streams-scala when Java 11+ is used

2023-02-27 Thread via GitHub


ijuma merged PR #13311:
URL: https://github.com/apache/kafka/pull/13311


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: Upgrade gradle version from 7.6 to 8.0.1

2023-02-27 Thread via GitHub


ijuma commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1445905740

   The `spotlessScala` issue was fixed via #13311.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13263: KAFKA-14728: Don't run 'spotlessScalaCheck' in CI

2023-02-27 Thread via GitHub


ijuma commented on PR #13263:
URL: https://github.com/apache/kafka/pull/13263#issuecomment-1445906790

   @guozhangwang Note that we found a way to re-enable this when running with 
Java 11 or newer, see #13311.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14749) Re-enable 'spotlessScalaCheck' task (in Jenkinsfile)

2023-02-27 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-14749.
-
  Assignee: Ismael Juma
Resolution: Fixed

We found a way to re-enable this when running with Java 11 or newer, see 
https://github.com/apache/kafka/pull/13311.

> Re-enable 'spotlessScalaCheck' task (in Jenkinsfile)
> 
>
> Key: KAFKA-14749
> URL: https://issues.apache.org/jira/browse/KAFKA-14749
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 3.5.0
>
>
> {*}Description{*}:
> We were forced to remove 'spotlessScalaCheck' (see KAFKA-14728) but we should 
> bring it back when circumstances change (i.e. when Apache Kafka 4.0 drops 
> support for Java 8).
> Related github issue comment: 
> [https://github.com/apache/kafka/pull/13263#issuecomment-1441825913]
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14749) Re-enable 'spotlessScalaCheck' task (in Jenkinsfile)

2023-02-27 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14749:

Fix Version/s: 3.5.0
   (was: 4.0.0)

> Re-enable 'spotlessScalaCheck' task (in Jenkinsfile)
> 
>
> Key: KAFKA-14749
> URL: https://issues.apache.org/jira/browse/KAFKA-14749
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Dejan Stojadinović
>Priority: Blocker
> Fix For: 3.5.0
>
>
> {*}Description{*}:
> We were forced to remove 'spotlessScalaCheck' (see KAFKA-14728) but we should 
> bring it back when circumstances change (i.e. when Apache Kafka 4.0 drops 
> support for Java 8).
> Related github issue comment: 
> [https://github.com/apache/kafka/pull/13263#issuecomment-1441825913]
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14749) Re-enable 'spotlessScalaCheck' task (in Jenkinsfile)

2023-02-27 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-14749:

Priority: Major  (was: Blocker)

> Re-enable 'spotlessScalaCheck' task (in Jenkinsfile)
> 
>
> Key: KAFKA-14749
> URL: https://issues.apache.org/jira/browse/KAFKA-14749
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.5.0
>
>
> {*}Description{*}:
> We were forced to remove 'spotlessScalaCheck' (see KAFKA-14728) but we should 
> bring it back when circumstances change (i.e. when Apache Kafka 4.0 drops 
> support for Java 8).
> Related github issue comment: 
> [https://github.com/apache/kafka/pull/13263#issuecomment-1441825913]
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14680) Gradle version upgrade 7 -->> 8

2023-02-27 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-14680.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> Gradle version upgrade 7 -->> 8
> ---
>
> Key: KAFKA-14680
> URL: https://issues.apache.org/jira/browse/KAFKA-14680
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Major
> Fix For: 3.5.0
>
>
> +*Gradle 8 release notes:*+
>  * *8.0*
>  ** [https://github.com/gradle/gradle/releases/tag/v8.0.0]
>  ** 
> [https://docs.gradle.org/8.0/release-notes.html|https://docs.gradle.org/8.0-rc-3/release-notes.html]
>  *  *8.0.1:*
>  ** [https://github.com/gradle/gradle/releases/tag/v8.0.1]
>  ** [https://docs.gradle.org/8.0.1/release-notes.html]
>  ** [https://github.com/gradle/gradle/milestone/229?closed=1]
> *Upgrade notes:* 
> [https://docs.gradle.org/8.0/userguide/upgrading_version_7.html#changes_8.0|https://docs.gradle.org/8.0/userguide/upgrading_version_7.html#changes_8.0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-27 Thread via GitHub


satishd commented on PR #13304:
URL: https://github.com/apache/kafka/pull/13304#issuecomment-1445917205

   Thanks @showuon for your review. Addressed them with inline replies or the 
latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118380508


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -34,22 +43,40 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 
 private final AddPartitionsToTxnRequestData data;
 
-private List cachedPartitions = null;
+private final short version;

Review Comment:
   The version is already in the base class. Do we really need it here?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1328,7 +1328,7 @@ Priority priority() {
 @Override
 public void handleResponse(AbstractResponse response) {
 AddPartitionsToTxnResponse addPartitionsToTxnResponse = 
(AddPartitionsToTxnResponse) response;
-Map errors = 
addPartitionsToTxnResponse.errors();
+Map errors = 
addPartitionsToTxnResponse.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID);

Review Comment:
   nit: I suppose that `errors` should never be `null` here. I wonder if we 
should still check it. What do you think?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -34,22 +43,40 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 
 private final AddPartitionsToTxnRequestData data;
 
-private List cachedPartitions = null;
+private final short version;
 
 public static class Builder extends 
AbstractRequest.Builder {
 public final AddPartitionsToTxnRequestData data;
+
+public static Builder forClient(String transactionalId,
+long producerId,
+short producerEpoch,
+List partitions) {
+
+AddPartitionsToTxnTopicCollection topics = 
buildTxnTopicCollection(partitions);
+
+return new Builder(ApiKeys.ADD_PARTITIONS_TO_TXN.oldestVersion(),
+(short) 3, 
+new AddPartitionsToTxnRequestData()
+.setV3AndBelowTransactionalId(transactionalId)
+.setV3AndBelowProducerId(producerId)
+.setV3AndBelowProducerEpoch(producerEpoch)
+.setV3AndBelowTopics(topics));

Review Comment:
   nit: The indentation of the arguments looks inconsistent.



##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,39 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.

Review Comment:
   I think that we should explain that v4 is only for other brokers and clients 
are suppose to use version <= v3.



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1303,11 +1305,13 @@ public void 
testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr
 Map errors = new HashMap<>();
 errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
 errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
+AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
+AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
 client.respond(body -> {
 AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) 
body;
-assertEquals(new HashSet<>(request.partitions()), new 
HashSet<>(errors.keySet()));
+assertEquals(new 
HashSet<>(AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics())),
 new HashSet<>(errors.keySet()));

Review Comment:
   
`AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics()))` 
looks a bit weird from an encapsulation perspective. Why not just keeping 
`partitions` as before if you need it?



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-

[GitHub] [kafka] mimaison merged pull request #13174: MINOR: Various cleanups in common utils

2023-02-27 Thread via GitHub


mimaison merged PR #13174:
URL: https://github.com/apache/kafka/pull/13174


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14206) Upgrade zookeeper to 3.7.1 to address security vulnerabilities

2023-02-27 Thread Valeriy Kassenbayev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeriy Kassenbayev updated KAFKA-14206:

Fix Version/s: 3.4.1
   (was: 3.4.0)

> Upgrade zookeeper to 3.7.1 to address security vulnerabilities
> --
>
> Key: KAFKA-14206
> URL: https://issues.apache.org/jira/browse/KAFKA-14206
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 3.2.1
>Reporter: Valeriy Kassenbayev
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.4.1
>
>
> Kafka 3.2.1 is using ZooKeeper, which is affected by 
> [CVE-2021-37136|https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] and 
> [CVE-2021-37137:|https://www.cve.org/CVERecord?id=CVE-2021-37137]
> {code:java}
>   ✗ Denial of Service (DoS) [High 
> Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584063] in 
> io.netty:netty-codec@4.1.63.Final
>     introduced by org.apache.kafka:kafka_2.13@3.2.1 > 
> org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
> io.netty:netty-codec@4.1.63.Final
>   This issue was fixed in versions: 4.1.68.Final
>   ✗ Denial of Service (DoS) [High 
> Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] in 
> io.netty:netty-codec@4.1.63.Final
>     introduced by org.apache.kafka:kafka_2.13@3.2.1 > 
> org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
> io.netty:netty-codec@4.1.63.Final
>   This issue was fixed in versions: 4.1.68.Final {code}
> The issues were fixed in the next versions of ZooKeeper (starting from 
> 3.6.4). ZooKeeper 3.7.1 is the next stable 
> [release|https://zookeeper.apache.org/releases.html] at the moment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14206) Upgrade zookeeper to 3.7.1 to address security vulnerabilities

2023-02-27 Thread Valeriy Kassenbayev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeriy Kassenbayev reopened KAFKA-14206:
-

Still have the same CVEs reported:
{code:java}
  ✗ Denial of Service (DoS) [High 
Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584063] in 
io.netty:netty-codec@4.1.63.Final
    introduced by org.apache.kafka:kafka_2.13@3.4.0 > 
org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
io.netty:netty-codec@4.1.63.Final
  This issue was fixed in versions: 4.1.68.Final
  ✗ Denial of Service (DoS) [High 
Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] in 
io.netty:netty-codec@4.1.63.Final
    introduced by org.apache.kafka:kafka_2.13@3.4.0 > 
org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
io.netty:netty-codec@4.1.63.Final
  This issue was fixed in versions: 4.1.68.Final {code}
ZooKeeper does not seem to have been upgraded:
{code:java}
[mac /tmp]# tar tzf kafka_2.13-3.4.0.tgz | grep -i libs/zookeeper
kafka_2.13-3.4.0/libs/zookeeper-3.6.3.jar
kafka_2.13-3.4.0/libs/zookeeper-jute-3.6.3.jar
[mac /tmp]# {code}

> Upgrade zookeeper to 3.7.1 to address security vulnerabilities
> --
>
> Key: KAFKA-14206
> URL: https://issues.apache.org/jira/browse/KAFKA-14206
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 3.2.1
>Reporter: Valeriy Kassenbayev
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.4.0
>
>
> Kafka 3.2.1 is using ZooKeeper, which is affected by 
> [CVE-2021-37136|https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] and 
> [CVE-2021-37137:|https://www.cve.org/CVERecord?id=CVE-2021-37137]
> {code:java}
>   ✗ Denial of Service (DoS) [High 
> Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584063] in 
> io.netty:netty-codec@4.1.63.Final
>     introduced by org.apache.kafka:kafka_2.13@3.2.1 > 
> org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
> io.netty:netty-codec@4.1.63.Final
>   This issue was fixed in versions: 4.1.68.Final
>   ✗ Denial of Service (DoS) [High 
> Severity][https://security.snyk.io/vuln/SNYK-JAVA-IONETTY-1584064] in 
> io.netty:netty-codec@4.1.63.Final
>     introduced by org.apache.kafka:kafka_2.13@3.2.1 > 
> org.apache.zookeeper:zookeeper@3.6.3 > io.netty:netty-handler@4.1.63.Final > 
> io.netty:netty-codec@4.1.63.Final
>   This issue was fixed in versions: 4.1.68.Final {code}
> The issues were fixed in the next versions of ZooKeeper (starting from 
> 3.6.4). ZooKeeper 3.7.1 is the next stable 
> [release|https://zookeeper.apache.org/releases.html] at the moment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13145: MINOR: Remove duplicate empty string check

2023-02-27 Thread via GitHub


mimaison commented on PR #13145:
URL: https://github.com/apache/kafka/pull/13145#issuecomment-1446008315

   @C0urante can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13170: MINOR: Remove unused methods in CoreUtils

2023-02-27 Thread via GitHub


mimaison commented on PR #13170:
URL: https://github.com/apache/kafka/pull/13170#issuecomment-1446015320

   @tombentley can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14763) Add integration test for DelegationTokenCommand tool

2023-02-27 Thread Gantigmaa Selenge (Jira)
Gantigmaa Selenge created KAFKA-14763:
-

 Summary: Add integration test for DelegationTokenCommand tool
 Key: KAFKA-14763
 URL: https://issues.apache.org/jira/browse/KAFKA-14763
 Project: Kafka
  Issue Type: Task
Reporter: Gantigmaa Selenge


When moving DelegationTokenCommand from core to tools module in 
[https://github.com/apache/kafka/pull/13172], the existing integration test 
could not be migrated because there is no {{BaseRequestTest}} or {{SaslSetup}} 
to help setup integration tests in the tools module. We will need to create 
similar setup in the tools module and create an integration test for the 
command tool. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

2023-02-27 Thread via GitHub


tinaselenge commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1446056568

   Created a Jira issue for adding integration test for this tool later on: 
https://issues.apache.org/jira/browse/KAFKA-14763


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a diff in pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs

2023-02-27 Thread via GitHub


edoardocomar commented on code in PR #13291:
URL: https://github.com/apache/kafka/pull/13291#discussion_r1118563985


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -81,6 +81,7 @@
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG;
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.THROUGHPUT_CONFIG;

Review Comment:
   I had to go and look what unit `throughput` was in and `ThroughputThrottler` 
says `Can be messages/sec or bytes/sec` . In the case of this test it is 
messages, so for me teh longer name I suggested helps readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a diff in pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs

2023-02-27 Thread via GitHub


edoardocomar commented on code in PR #13291:
URL: https://github.com/apache/kafka/pull/13291#discussion_r1118569732


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -266,6 +267,7 @@ public void testPollBoundary() throws Exception {
 props.put(NAME_CONFIG, CONNECTOR_NAME);
 props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
 props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+props.put(THROUGHPUT_CONFIG, Integer.toString(recordsProduced));

Review Comment:
   Again, it may not be *necessary* to use Long instead of Integer but it helps.
   And using two variables instead of one, although with related values is 
again helping readability.
   Reusing the same one is something that makes me stop and think "why...?"
   So having a 2nd variable e.g. like
   `long throughput_msgs_sec = recordsProduced / 2L;`
   would be my preference (and a short line comment for it) e.g.
   // need to limit actual records.count() to avoid OOM
   



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -266,6 +267,7 @@ public void testPollBoundary() throws Exception {
 props.put(NAME_CONFIG, CONNECTOR_NAME);
 props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
 props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+props.put(THROUGHPUT_CONFIG, Integer.toString(recordsProduced));

Review Comment:
   Again, it may not be *necessary* to use Long instead of Integer but it helps.
   And using two variables instead of one, although with related values is 
again helping readability.
   Reusing the same one is something that makes me stop and think "why...?"
   So having a 2nd variable e.g. like
   `long throughput_msgs_sec = recordsProduced / 2L;`
   would be my preference (and a short line comment for it) e.g.
   `// need to limit actual records.count() to avoid OOM`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a diff in pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs

2023-02-27 Thread via GitHub


edoardocomar commented on code in PR #13291:
URL: https://github.com/apache/kafka/pull/13291#discussion_r1118563985


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -81,6 +81,7 @@
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG;
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.THROUGHPUT_CONFIG;

Review Comment:
   I had to go and look what unit `throughput` was in and `ThroughputThrottler` 
says `Can be messages/sec or bytes/sec` that's why its name is generic. 
   In the case of this test, it is messages/sec, so for me the longer name I 
suggested helps readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2023-02-27 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693955#comment-17693955
 ] 

Mickael Maison commented on KAFKA-2967:
---

Thanks [~jeqo] for picking this up, your PoC looks really good!

>From the discussion above, it seems there was consensus on using asciidoc if 
>possible. It looks like this may be possible with Hugo: 
>https://gohugo.io/content-management/formats/#list-of-content-formats Have you 
>tried it?



> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13145: MINOR: Remove duplicate empty string check

2023-02-27 Thread via GitHub


mimaison merged PR #13145:
URL: https://github.com/apache/kafka/pull/13145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] edoardocomar commented on a diff in pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs

2023-02-27 Thread via GitHub


edoardocomar commented on code in PR #13291:
URL: https://github.com/apache/kafka/pull/13291#discussion_r1118569732


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -266,6 +267,7 @@ public void testPollBoundary() throws Exception {
 props.put(NAME_CONFIG, CONNECTOR_NAME);
 props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
 props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+props.put(THROUGHPUT_CONFIG, Integer.toString(recordsProduced));

Review Comment:
   Again, it may not be *necessary* to use Long instead of Integer but it 
helps. Property is a long, I'd prefer to set a long rather than rely on 
conversion.
   And using two variables instead of one, although with related values is 
again helping readability.
   Reusing the same one is something that makes me stop and think "why...?"
   So having a 2nd variable e.g. like
   `long throughput_msgs_sec = recordsProduced / 2L;`
   would be my preference (and a short line comment for it) e.g.
   `// need to limit actual records.count() to avoid OOM`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13302: KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new test-plugins module

2023-02-27 Thread via GitHub


C0urante commented on PR #13302:
URL: https://github.com/apache/kafka/pull/13302#issuecomment-1446393595

   Hey @gharris1727, thanks for the PR. While I personally agree with the 
rationale here and would be happy to merge the changes, I think this may still 
require a KIP as it removes connectors from OOTB Connect.
   
   Some users may be relying on the `SchemaSourceConnector` as a rudimentary 
data generation tool in their own testing or development environments, and 
might be annoyed if we make it more difficult to use that connector.
   
   The description for KAFKA-14759 does mention a desire to "reduce the attack 
surface area of a default Connect installation", which may be the only reason 
we would want to merge this change without a KIP; if any of these connectors 
presents an exploit in Connect, then we'd be justified in removing them just 
like we did with the `FileStream` source and sink connectors. If you've 
identified anything like that, please let us know by emailing 
secur...@kafka.apache.org.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-27 Thread via GitHub


C0urante merged PR #13191:
URL: https://github.com/apache/kafka/pull/13191


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14041) Avoid the keyword var for a variable declaration in ConfigTransformer

2023-02-27 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694048#comment-17694048
 ] 

Andras Katona commented on KAFKA-14041:
---

Java compiler doesn't mind. {{var}} can be used as variable name.

> Avoid the keyword var for a variable declaration in ConfigTransformer
> -
>
> Key: KAFKA-14041
> URL: https://issues.apache.org/jira/browse/KAFKA-14041
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: QualiteSys QualiteSys
>Priority: Major
>
> In the file 
> clients\src\main\java\org\apache\kafka\common\config\ConfigTransformer.java a 
> variable named var is declared :
> line 84 : for (ConfigVariable var : vars) {
> Since it is a java keyword, could the variable name be changed ?
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14060) Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-27 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14060.
---
Resolution: Done

> Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
> ---
>
> Key: KAFKA-14060
> URL: https://issues.apache.org/jira/browse/KAFKA-14060
> Project: Kafka
>  Issue Type: Sub-task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Hector Geraldino
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14738) Topic disappears from kafka_topic.sh --list after modifying it with kafka_acl.sh

2023-02-27 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona resolved KAFKA-14738.
---
Resolution: Not A Bug

> Topic disappears from kafka_topic.sh --list after modifying it with 
> kafka_acl.sh
> 
>
> Key: KAFKA-14738
> URL: https://issues.apache.org/jira/browse/KAFKA-14738
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.2.3
>Reporter: Gabriel Lukacs
>Priority: Major
>
> Topic is not listed via kafka-topics.sh --list after modifying it with 
> kafka-acls.sh (-add --allow-principal User:CN=test --operation Read):
> $ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 
> --topic test2 --replication-factor 1 --partitions 50
> Created topic test2.
> $ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --topic 
> test2
> test2
> $ /opt/kafka/bin/kafka-acls.sh --bootstrap-server kafka:9092 --topic test2 
> --add --allow-principal User:CN=test --operation Read
> Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, 
> patternType=LITERAL)`:
>         (principal=User:CN=test, host=*, operation=READ, permissionType=ALLOW)
> Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, 
> patternType=LITERAL)`:
>         (principal=User:CN=test, host=*, operation=READ, permissionType=ALLOW)
> $ /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092 --topic 
> test2                                   
> $ /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka:9092 
> --topic test2
> Error while executing topic command : Topic 'test2' already exists.
> [2023-02-21 16:37:39,185] ERROR 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'test2' already 
> exists.
>  (kafka.admin.TopicCommand$)
> $ /opt/kafka/bin/kafka-topics.sh --delete --bootstrap-server kafka:9092 
> --topic test2
> Error while executing topic command : Topic 'test2' does not exist as expected
> [2023-02-21 16:37:49,485] ERROR java.lang.IllegalArgumentException: Topic 
> 'test2' does not exist as expected
>         at 
> kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:401)
>         at 
> kafka.admin.TopicCommand$TopicService.deleteTopic(TopicCommand.scala:361)
>         at kafka.admin.TopicCommand$.main(TopicCommand.scala:63)
>         at kafka.admin.TopicCommand.main(TopicCommand.scala)
>  (kafka.admin.TopicCommand$)
> $ /opt/kafka/bin/kafka-topics.sh --version
> 3.2.3 (Commit:50029d3ed8ba576f)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-02-27 Thread via GitHub


rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1446491240

   The `topicIdInfo` benchmark now performs equivalently.  I eliminated the 
`TranslatedValueMapView` class in favor of adding a value mapper to the 
`VavrMapAsJava` class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14387) kafka.common.KafkaException | kafka_2.12-3.3.1.jar

2023-02-27 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona resolved KAFKA-14387.
---
Resolution: Information Provided

> kafka.common.KafkaException  | kafka_2.12-3.3.1.jar
> ---
>
> Key: KAFKA-14387
> URL: https://issues.apache.org/jira/browse/KAFKA-14387
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.1
>Reporter: masood
>Priority: Major
>
> It appears, Kafka.common.KafkaException is deprecated in 
> kafka_2.12-3.3.1.jar. 
> Please let me know which exception should be used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14387) kafka.common.KafkaException | kafka_2.12-3.3.1.jar

2023-02-27 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694068#comment-17694068
 ] 

Andras Katona commented on KAFKA-14387:
---

it was removed here:
https://github.com/apache/kafka/commit/c9c03dd7ef9ff4edf2596e905cabececc72a9e9d

its commit message 
{quote}
Use the standard org.apache.kafka.common.KafkaException instead of 
kafka.common.KafkaException.
{quote}

> kafka.common.KafkaException  | kafka_2.12-3.3.1.jar
> ---
>
> Key: KAFKA-14387
> URL: https://issues.apache.org/jira/browse/KAFKA-14387
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.1
>Reporter: masood
>Priority: Major
>
> It appears, Kafka.common.KafkaException is deprecated in 
> kafka_2.12-3.3.1.jar. 
> Please let me know which exception should be used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14764) Metadata API ignores topic names if at least one topic is provided with an id

2023-02-27 Thread David Jacot (Jira)
David Jacot created KAFKA-14764:
---

 Summary: Metadata API ignores topic names if at least one topic is 
provided with an id
 Key: KAFKA-14764
 URL: https://issues.apache.org/jira/browse/KAFKA-14764
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot


The Metadata API accepts both topic names and topic ids in the request. This 
suggests that a single request could mix them in. At least, we have no logic on 
the server side to prevent this. The issue is that the server just ignores any 
topic specified with a name if there is at least one topic specified with an id 
in the request. In other words, if a request contains topic-id-1, topic-id-2, 
topic-name-1 and topic-name-2, the response will only have metadata for 
topic-id-1 and topic-id-2.

This does not hurt us today because the clients does not use topic ids in the 
request at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14765) Support SCRAM for brokers at bootstrap

2023-02-27 Thread Proven Provenzano (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Proven Provenzano reassigned KAFKA-14765:
-

Assignee: Proven Provenzano

> Support SCRAM for brokers at bootstrap
> --
>
> Key: KAFKA-14765
> URL: https://issues.apache.org/jira/browse/KAFKA-14765
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: KIP-900
>
> We want to add SCRAM support for brokers at bootstrap.
> We will support bootstrap as described in 
> [KIP-900|https://cwiki.apache.org/confluence/display/KAFKA/KIP-900%3A+KRaft+kafka-storage.sh+API+additions+to+support+SCRAM+for+Kafka+Brokers]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14765) Support SCRAM for brokers at bootstrap

2023-02-27 Thread Proven Provenzano (Jira)
Proven Provenzano created KAFKA-14765:
-

 Summary: Support SCRAM for brokers at bootstrap
 Key: KAFKA-14765
 URL: https://issues.apache.org/jira/browse/KAFKA-14765
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Reporter: Proven Provenzano


We want to add SCRAM support for brokers at bootstrap.

We will support bootstrap as described in 
[KIP-900|https://cwiki.apache.org/confluence/display/KAFKA/KIP-900%3A+KRaft+kafka-storage.sh+API+additions+to+support+SCRAM+for+Kafka+Brokers]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-02-27 Thread via GitHub


dajac commented on PR #13240:
URL: https://github.com/apache/kafka/pull/13240#issuecomment-1446539881

   @Hangleton Thanks for the update. I will take a look shortly. In the 
meantime, could you already add a few integration test for the 
OffsetCommitRequest like we have for the OffsetFetchRequest in 
kafka.server.OffsetFetchRequestTest? I believe that topic ids are actually lost 
when they are passed to the group coordinator. Hence, I request with topic ids 
will very likely return a response with topic names instead of topic ids. The 
issue is that we don't catch those kind of issues with the existing tests 
because the logic in the consumer supports both ways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-02-27 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693369#comment-17693369
 ] 

Chris Egerton edited comment on KAFKA-14750 at 2/27/23 3:28 PM:


Thanks for filing this, [~morozov]!

I've done some local testing and confirmed that the issue affects the current 
trunk, and after doing some digging, I suspect it goes back pretty far.

Initially I believed that this was simply a matter of adjusting the task's 
consumer's [metadata refresh 
interval|https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms]
 to be lower, which would cause it to detect changes in its topic regex 
subscription sooner.

However, even after making that tweak, issues still surfaced. This is due to 
the fact that, after a topic is deleted, the task's [consumer rebalance 
listener|https://github.com/apache/kafka/blob/400ba0aeaeb6c460069d5ad12b1b3976ab447332/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L699-L784]
 is notified of the revocation of the partitions for that topic, which triggers 
an attempt to commit offsets–including offsets for the revoked topic partitions.

There are a couple of approaches I can think of for this:
 # Adjust the Connect runtime's behavior to somehow discern the set of 
still-existing topic partitions before committing offsets, and skip committing 
offsets for recently-deleted topic partitions
 # Tweak the consumer logic to invoke 
{{ConsumerReabalanceListener::onPartitionsLost}} instead of 
{{ConsumerRebalanceListener::onPartitionsRevoked}} for deleted topic partitions.

Given that option 1 is inherently subject to race conditions, I'd prefer to 
pursue option 2 initially. However, I'm not too familiar with the clients side 
of things, so it'd be nice to get a second opinion.

 

[~jasong35] [~pnee] if either of you get a chance, would you mind weighing in 
here? TL; DR: Should we be treating deleted topic partitions as "lost" instead 
of "revoked" with consumer rebalance listeners?


was (Author: chrisegerton):
Thanks for filing this, [~morozov]!

I've done some local testing and confirmed that the issue affects the current 
trunk, and after doing some digging, I suspect it goes back pretty far.

Initially I suspected that this was simply a matter of adjusting the task's 
consumer's [metadata refresh 
interval|https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms]
 to be lower, which would cause it to detect changes in its topic regex 
subscription sooner.

However, even after making that tweak, issues still surfaced. This is due to 
the fact that, after a topic is deleted, the task's [consumer rebalance 
listener|https://github.com/apache/kafka/blob/400ba0aeaeb6c460069d5ad12b1b3976ab447332/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L699-L784]
 is notified of the revocation of the partitions for that topic, which triggers 
an attempt to commit offsets–including offsets for the revoked topic partitions.

There are a couple of approaches I can think of for this:
 # Adjust the Connect runtime's behavior to somehow discern the set of 
still-existing topic partitions before committing offsets, and skip committing 
offsets for recently-deleted topic partitions
 # Tweak the consumer logic to invoke 
{{ConsumerReabalanceListener::onPartitionsLost}} instead of 
{{ConsumerRebalanceListener::onPartitionsRevoked}} for deleted topic partitions.

Given that option 1 is inherently subject to race conditions, I'd prefer to 
pursue option 2 initially. However, I'm not too familiar with the clients side 
of things, so it'd be nice to get a second opinion.

 

[~jasong35] [~pnee] if either of you get a chance, would you mind weighing in 
here? TL; DR: Should we be treating deleted topic partitions as "lost" instead 
of "revoked" with consumer rebalance listeners?

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bi

[jira] [Commented] (KAFKA-14765) Support SCRAM for brokers at bootstrap

2023-02-27 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694074#comment-17694074
 ] 

Proven Provenzano commented on KAFKA-14765:
---

[KIP-900|https://cwiki.apache.org/confluence/display/KAFKA/KIP-900%3A+KRaft+kafka-storage.sh+API+additions+to+support+SCRAM+for+Kafka+Brokers]
 is approved, see 
[Vote|https://lists.apache.org/thread/gfqxhrkft1fmzvzb283z6tv12y703b8j] 

Waiting on commit of KAFKA-14084 PR 
[13114|https://github.com/apache/kafka/pull/13114]

> Support SCRAM for brokers at bootstrap
> --
>
> Key: KAFKA-14765
> URL: https://issues.apache.org/jira/browse/KAFKA-14765
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Reporter: Proven Provenzano
>Assignee: Proven Provenzano
>Priority: Major
>  Labels: KIP-900
>
> We want to add SCRAM support for brokers at bootstrap.
> We will support bootstrap as described in 
> [KIP-900|https://cwiki.apache.org/confluence/display/KAFKA/KIP-900%3A+KRaft+kafka-storage.sh+API+additions+to+support+SCRAM+for+Kafka+Brokers]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14764) Metadata API ignores topic names if at least one topic is provided with an id

2023-02-27 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694076#comment-17694076
 ] 

David Jacot commented on KAFKA-14764:
-

[~dengziming] If I remember correctly, you did the implementation. Could you 
double check this?

> Metadata API ignores topic names if at least one topic is provided with an id
> -
>
> Key: KAFKA-14764
> URL: https://issues.apache.org/jira/browse/KAFKA-14764
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> The Metadata API accepts both topic names and topic ids in the request. This 
> suggests that a single request could mix them in. At least, we have no logic 
> on the server side to prevent this. The issue is that the server just ignores 
> any topic specified with a name if there is at least one topic specified with 
> an id in the request. In other words, if a request contains topic-id-1, 
> topic-id-2, topic-name-1 and topic-name-2, the response will only have 
> metadata for topic-id-1 and topic-id-2.
> This does not hurt us today because the clients does not use topic ids in the 
> request at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hgeraldino commented on pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


hgeraldino commented on PR #13193:
URL: https://github.com/apache/kafka/pull/13193#issuecomment-1446570858

   Thanks @clolov, 
   
   Now that https://github.com/apache/kafka/pull/13191 has been merged, this PR 
has been rebased and is ready for review. 
   
   CC @C0urante 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14764) Metadata API ignores topic names if at least one topic ID is provided

2023-02-27 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14764:

Summary: Metadata API ignores topic names if at least one topic ID is 
provided  (was: Metadata API ignores topic names if at least one topic is 
provided with an id)

> Metadata API ignores topic names if at least one topic ID is provided
> -
>
> Key: KAFKA-14764
> URL: https://issues.apache.org/jira/browse/KAFKA-14764
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> The Metadata API accepts both topic names and topic ids in the request. This 
> suggests that a single request could mix them in. At least, we have no logic 
> on the server side to prevent this. The issue is that the server just ignores 
> any topic specified with a name if there is at least one topic specified with 
> an id in the request. In other words, if a request contains topic-id-1, 
> topic-id-2, topic-name-1 and topic-name-2, the response will only have 
> metadata for topic-id-1 and topic-id-2.
> This does not hurt us today because the clients does not use topic ids in the 
> request at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14764) Metadata API ignores topic names if at least one topic id is provided with an id

2023-02-27 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14764:

Summary: Metadata API ignores topic names if at least one topic id is 
provided with an id  (was: Metadata API ignores topic names if at least one 
topic is provided with an id)

> Metadata API ignores topic names if at least one topic id is provided with an 
> id
> 
>
> Key: KAFKA-14764
> URL: https://issues.apache.org/jira/browse/KAFKA-14764
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> The Metadata API accepts both topic names and topic ids in the request. This 
> suggests that a single request could mix them in. At least, we have no logic 
> on the server side to prevent this. The issue is that the server just ignores 
> any topic specified with a name if there is at least one topic specified with 
> an id in the request. In other words, if a request contains topic-id-1, 
> topic-id-2, topic-name-1 and topic-name-2, the response will only have 
> metadata for topic-id-1 and topic-id-2.
> This does not hurt us today because the clients does not use topic ids in the 
> request at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14764) Metadata API ignores topic names if at least one topic is provided with an id

2023-02-27 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14764:

Summary: Metadata API ignores topic names if at least one topic is provided 
with an id  (was: Metadata API ignores topic names if at least one topic id is 
provided with an id)

> Metadata API ignores topic names if at least one topic is provided with an id
> -
>
> Key: KAFKA-14764
> URL: https://issues.apache.org/jira/browse/KAFKA-14764
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> The Metadata API accepts both topic names and topic ids in the request. This 
> suggests that a single request could mix them in. At least, we have no logic 
> on the server side to prevent this. The issue is that the server just ignores 
> any topic specified with a name if there is at least one topic specified with 
> an id in the request. In other words, if a request contains topic-id-1, 
> topic-id-2, topic-name-1 and topic-name-2, the response will only have 
> metadata for topic-id-1 and topic-id-2.
> This does not hurt us today because the clients does not use topic ids in the 
> request at all.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118946985


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,39 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 
and is still
+  // under developement. Hence, the API is not exposed by default by brokers
+  // unless explicitely enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
-  "about": "The transactional id corresponding to the transaction."},
-{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": 
"producerId",
+{ "name": "Transactions", "type": "[]AddPartitionsToTxnTransaction", 
"versions":  "4+",
+  "about": "List of transactions to add partitions to.", "fields": [
+  { "name": "TransactionalId", "type": "string", "versions": "4+", 
"mapKey": true, "entityType": "transactionalId",
+"about": "The transactional id corresponding to the transaction." },
+  { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": 
"producerId",
+"about": "Current producer id in use by the transactional id." },
+  { "name": "ProducerEpoch", "type": "int16", "versions": "4+",
+"about": "Current epoch associated with the producer id." },
+  { "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": 
false,
+"about": "Boolean to signify if we want to check if the partition is 
in the transaction rather than add it." },
+  { "name": "Topics", "type": "[]AddPartitionsToTxnTopic", "versions": 
"4+",
+"about": "The partitions to add to the transaction." }
+]},
+{ "name": "V3AndBelowTransactionalId", "type": "string", "versions": 
"0-3", "entityType": "transactionalId",

Review Comment:
   I had previous comments from @hachikuji and @guozhangwang that I should do 
something like this to avoid confusion when setting the fields. I have seen 
something similiar in other places but we didn't explicitly say the version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118948279


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -118,11 +123,78 @@ public AddPartitionsToTxnRequestData data() {
 
 @Override
 public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
-final HashMap errors = new HashMap<>();
-for (TopicPartition partition : partitions()) {
-errors.put(partition, Errors.forException(e));
+Errors error = Errors.forException(e);
+AddPartitionsToTxnResponseData response = new 
AddPartitionsToTxnResponseData();
+if (version < 4) {
+
response.setResultsByTopicV3AndBelow(errorResponseForTopics(data.v3AndBelowTopics(),
 error));
+} else {
+AddPartitionsToTxnResultCollection results = new 
AddPartitionsToTxnResultCollection();
+for (AddPartitionsToTxnTransaction transaction : 
data().transactions()) {
+
results.add(errorResponseForTransaction(transaction.transactionalId(), error));
+}
+response.setResultsByTransaction(results);
+response.setErrorCode(error.code());

Review Comment:
   As discussed by @hachikuji and @guozhangwang the idea was if a top level 
error was set we could skip the rest of the handling. I just set all the fields 
to the same error for consistency. What is your suggestion? To remove these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118949475


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1303,11 +1305,13 @@ public void 
testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr
 Map errors = new HashMap<>();
 errors.put(tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
 errors.put(tp1, Errors.OPERATION_NOT_ATTEMPTED);
+AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
+AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
 client.respond(body -> {
 AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) 
body;
-assertEquals(new HashSet<>(request.partitions()), new 
HashSet<>(errors.keySet()));
+assertEquals(new 
HashSet<>(AddPartitionsToTxnRequest.getPartitions(request.data().v3AndBelowTopics())),
 new HashSet<>(errors.keySet()));

Review Comment:
   I was told to avoid having version specific methods. In order to use by both 
the v3 and below, and v4 objects this was the best way to do it. It really only 
looks bad in the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118951636


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !a

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118952253


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !a

[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118980264


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2384,68 +2385,116 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val addPartitionsToTxnRequest =
+  if (request.context.apiVersion() < 4) 
+request.body[AddPartitionsToTxnRequest].normalizeRequest() 
+  else 
+request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResultsByTopicV3AndBelow(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  var canSend = false
+  responses.synchronized {
+if (responses.size() == txns.size()) {
+  canSend = true
+}
+  }
+  if (canSend) {
+requestHelper.sendResponseMaybeThrottle(request, createResponse)
+  }
+}
+
+txns.forEach( transaction => {
+  val transactionalId = transaction.transactionalId
+  val partitionsToAdd = 
partitionsByTransaction.get(transactionalId).asScala
+  
+  // Versions < 4 come from clients and must be authorized to write for 
the given transaction and for the given topics.
+  if (version < 4 && !aut

[GitHub] [kafka] dajac commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-27 Thread via GitHub


dajac commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1118981498


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -23,17 +23,39 @@
   // Version 2 adds the support for new error code PRODUCER_FENCED.
   //
   // Version 3 enables flexible versions.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds VerifyOnly field to check if partitions are already in 
transaction and adds support to batch multiple transactions.
+  // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 
and is still
+  // under developement. Hence, the API is not exposed by default by brokers
+  // unless explicitely enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "0+", 
"entityType": "transactionalId",
-  "about": "The transactional id corresponding to the transaction."},
-{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": 
"producerId",
+{ "name": "Transactions", "type": "[]AddPartitionsToTxnTransaction", 
"versions":  "4+",
+  "about": "List of transactions to add partitions to.", "fields": [
+  { "name": "TransactionalId", "type": "string", "versions": "4+", 
"mapKey": true, "entityType": "transactionalId",
+"about": "The transactional id corresponding to the transaction." },
+  { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": 
"producerId",
+"about": "Current producer id in use by the transactional id." },
+  { "name": "ProducerEpoch", "type": "int16", "versions": "4+",
+"about": "Current epoch associated with the producer id." },
+  { "name": "VerifyOnly", "type": "bool", "versions": "4+", "default": 
false,
+"about": "Boolean to signify if we want to check if the partition is 
in the transaction rather than add it." },
+  { "name": "Topics", "type": "[]AddPartitionsToTxnTopic", "versions": 
"4+",
+"about": "The partitions to add to the transaction." }
+]},
+{ "name": "V3AndBelowTransactionalId", "type": "string", "versions": 
"0-3", "entityType": "transactionalId",

Review Comment:
   Ack. That seems fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

2023-02-27 Thread via GitHub


C0urante commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119036590


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTaskReconfigurationRetries() {
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+// end of initial tick
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+member.wakeup();
+PowerMock.expectLastCall();

Review Comment:
   Wakeups basically don't matter in these tests; if it's easier, feel free to 
append `.anyTimes()` here.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTaskReconfigurationRetries() {
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+// end of initial tick
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+member.wakeup();
+PowerMock.expectLastCall();
+
+// second tick
+member.ensureActive();
+PowerMock.expectLastCall();
+
+EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+.andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();

Review Comment:
   This test is great. I think it'd be worth it to perform a third and fourth 
tick. The third can be used to simulate successfully generating task configs 
after the two failed attempts, and the fourth can be used to ensure that we 
don't retry any further.
   
   It's also worth noting that we're only testing the case where 
`Connector::taskConfigs` (or really, `Worker::connectorTaskConfigs`) fails, but 
the logic that's being added here applies if intra-cluster communication fails 
as well (which may happen if the leader of the cluster is temporarily 
unavailable, for example). It'd be nice if we could have test coverage for that 
too, but I won't block this PR on that.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -3694,6 +3694,54 @@ public void 
shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
 PowerMock.verifyAll();
 }
 
+@Test
+public void testTaskReconfigurationRetries() {
+EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+// end of initial tick
+member.poll(EasyMock.anyInt());
+PowerMock.expectLastCall();
+
+member.wakeup();
+PowerMock.expectLastCall();
+
+// second tick
+member.ensureActive();
+PowerMock.expectLastCall();
+
+EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+.andThrow(new ConnectException("Failed to generate task 
configs")).anyTimes();
+
+// task reconfiguration request with initial retry backoff
+member.poll(EasyMock.eq(250L));
+PowerMock.expectLastCall();
+
+member.ensureActive();
+PowerMock.expectLastCall();
+
+// task reconfiguration request with double the initial retry backoff
+member.poll(EasyMock.eq(500L));
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+// initial tick
+herder.tick();
+herder.requestTaskReconfiguration(CONN1);
+// process the task reconfi

[jira] [Created] (KAFKA-14766) Improve performance of VarInt encoding/decoding

2023-02-27 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-14766:


 Summary: Improve performance of VarInt encoding/decoding
 Key: KAFKA-14766
 URL: https://issues.apache.org/jira/browse/KAFKA-14766
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Divij Vaidya
Assignee: Divij Vaidya
 Fix For: 3.5.0


Our current implementation in ByteUtils could be improved via loop unrolling 
and short circuiting scenarios such as 0. 

With the changes in attached PR, we see upto 83% improvement in throughput for  
writingVarInt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya opened a new pull request, #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

2023-02-27 Thread via GitHub


divijvaidya opened a new pull request, #13312:
URL: https://github.com/apache/kafka/pull/13312

   # Motivation
   Reading/writing the protocol buffer varInt32 and varInt64 (also called 
varLong in our code base) is in the hot path of data plane code in Apache 
Kafka. We read multiple varInt in a record and in long. Hence, even a minor 
change in performance could extrapolate to larger performance benefit.
   
   # Changes
   This change uses loop unrolling and reduces the amount of repetition of 
calculations.
   
   # Results
   Performance has been evaluated using JMH benchmarks. The benefits of loop 
unrolling diminish as the size of loop increases (from Int32 to Int64)
   
   ```
   ByteUtilsBenchmark.testUnsignedReadVarintNew   thrpt5   51187.160 ±  
778.033  ops/s
   ByteUtilsBenchmark.testUnsignedReadVarintOld   thrpt5   43441.115 ± 
1281.592  ops/s
   
   ByteUtilsBenchmark.testUnsignedReadVarlongNew  thrpt5   28293.582 ±  
952.091  ops/s
   ByteUtilsBenchmark.testUnsignedReadVarlongOld  thrpt5   22734.384 ± 
1132.111  ops/s
   
   ByteUtilsBenchmark.testUnsignedWriteVarintNew  thrpt5  134848.804 ± 
2464.908  ops/s
   ByteUtilsBenchmark.testUnsignedWriteVarintOld  thrpt5   7.919 ± 
3306.315  ops/s
   
   ByteUtilsBenchmark.testUnsignedWriteVarlongNew thrpt5   40252.868 ± 
2551.431  ops/s
   ByteUtilsBenchmark.testUnsignedWriteVarlongOld thrpt5   34948.229 ± 
1877.987  ops/s
   ```
   
   # Testing
   New tests have been added which validate that the value produced by prior 
algorithm is same as new algorithm. This validates correctness.
   
   UnitTest and IntegrationTest are successful locally.
   
   # References
   - https://steinborn.me/posts/performance/how-fast-can-you-write-a-varint/ is 
a nice blog which explains the optimizations for writing varInt32.
   - 
https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73
 is Netty's unrolled implementation of readVarInt32
   - 
https://github.com/protocolbuffers/protobuf/blob/22.x/java/core/src/main/java/com/google/protobuf/CodedOutputStream.java#L1345
 is ProtoBuf's loop based implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-02-27 Thread via GitHub


kirktrue commented on PR #12813:
URL: https://github.com/apache/kafka/pull/12813#issuecomment-1446745731

   @dajac I'm having a bit of a hard time figuring out how to test this.
   
   Per your previous comments, the `NetworkClient` already issues a request for 
metadata update when the connection disconnects. However, because the 
`SenderTest` doesn't leverage `NetworkClient`, and instead uses `MockClient`, 
it's not possible to rely on that behavior.
   
   To fix this, I tried to update `MockClient` to have similar semantics to the 
`NetworkClient` with respect to when it handles disconnects and metadata 
updates, but that doesn't really "feel" right because it doesn't prove 
anything. I'm now thinking that I need to maybe handle this test in an 
integration test. I'll look into that now.
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14757) Kafka Cooperative Sticky Assignor results in significant duplicate consumption

2023-02-27 Thread Siddharth Anand (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694115#comment-17694115
 ] 

Siddharth Anand commented on KAFKA-14757:
-

Is there documentation that details how onPartitionsRevoked works with the 
cooperative sticky assignor? How does it keep duplicates from being created 
when manually committing checkpoints?

 

 

> Kafka Cooperative Sticky Assignor results in significant duplicate consumption
> --
>
> Key: KAFKA-14757
> URL: https://issues.apache.org/jira/browse/KAFKA-14757
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.1.1
> Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in 
> Spring Boot consumers.
>Reporter: Siddharth Anand
>Priority: Critical
>
> Details may be found within the linked document:
> [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption | 
> [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]]
> In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in 
> significant duplicate message consumption. During last year's F1 Grand Prix 
> events and World Cup soccer events, our company's Kafka-based platform 
> received live-traffic. This live traffic, coupled with autoscaled consumers 
> resulted in as much as 70% duplicate message consumption at the Kafka 
> consumers. 
> In December 2022, we ran a synthetic load test to confirm that duplicate 
> message consumption occurs during consumer scale out/in and Kafka partition 
> rebalancing when using the Cooperative Sticky Assignor. This issue does not 
> occur when using the Range Assignor.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14757) Kafka Cooperative Sticky Assignor results in significant duplicate consumption

2023-02-27 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694119#comment-17694119
 ] 

Philip Nee commented on KAFKA-14757:


Hey Sidd, here you go: 
[https://kafka.apache.org/33/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html]

Per javaDoc: "It is recommended that offsets should be committed in this 
callback to either Kafka or a custom offset store to prevent duplicate data."

 

I think if you don't use auto-commit and if you don't commit in 
onPartitionsRevoked() then you lose the progress from the last commit.

> Kafka Cooperative Sticky Assignor results in significant duplicate consumption
> --
>
> Key: KAFKA-14757
> URL: https://issues.apache.org/jira/browse/KAFKA-14757
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.1.1
> Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in 
> Spring Boot consumers.
>Reporter: Siddharth Anand
>Priority: Critical
>
> Details may be found within the linked document:
> [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption | 
> [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]]
> In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in 
> significant duplicate message consumption. During last year's F1 Grand Prix 
> events and World Cup soccer events, our company's Kafka-based platform 
> received live-traffic. This live traffic, coupled with autoscaled consumers 
> resulted in as much as 70% duplicate message consumption at the Kafka 
> consumers. 
> In December 2022, we ran a synthetic load test to confirm that duplicate 
> message consumption occurs during consumer scale out/in and Kafka partition 
> rebalancing when using the Cooperative Sticky Assignor. This issue does not 
> occur when using the Range Assignor.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13304: KAFKA-14726 Move/rewrite of LogReadInfo, LogOffsetSnapshot, LogStartOffsetIncrementReason to storage module

2023-02-27 Thread via GitHub


junrao commented on code in PR #13304:
URL: https://github.com/apache/kafka/pull/13304#discussion_r1119112476


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -1434,13 +1434,13 @@ class Partition(val topicPartition: TopicPartition,
   minOneMessage
 )
 
-LogReadInfo(
-  fetchedData = fetchedData,
-  divergingEpoch = None,
-  highWatermark = initialHighWatermark,
-  logStartOffset = initialLogStartOffset,
-  logEndOffset = initialLogEndOffset,
-  lastStableOffset = initialLastStableOffset
+new LogReadInfo(
+  fetchedData,
+  Optional.empty[FetchResponseData.EpochEndOffset](),

Review Comment:
   Could we just do `Optional.empty()`?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadInfo.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.message.FetchResponseData;
+
+import java.util.Optional;
+
+/**
+ * Structure used for lower level reads using {@link 
kafka.cluster.Partition#fetchRecords()}.
+ */
+public class LogReadInfo {

Review Comment:
   All scala case classes define `equal()` and `hashcode()`. Do we need to 
define those explicitly in java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13309: MINOR Moved a few log segment util methods from LocalLog to LogFileUtils

2023-02-27 Thread via GitHub


junrao commented on code in PR #13309:
URL: https://github.com/apache/kafka/pull/13309#discussion_r1119124556


##
core/src/main/scala/kafka/log/LocalLog.scala:
##
@@ -686,40 +685,6 @@ object LocalLog extends Logging {
 s"${topicPartition.topic}-${topicPartition.partition}"
   }
 
-  /**
-   * Construct an index file name in the given dir using the given base offset 
and the given suffix
-   *
-   * @param dir The directory in which the log will reside
-   * @param offset The base offset of the log file
-   * @param suffix The suffix to be appended to the file name ("", ".deleted", 
".cleaned", ".swap", etc.)
-   */
-  private[log] def offsetIndexFile(dir: File, offset: Long, suffix: String = 
""): File =
-new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix + suffix)

Review Comment:
   Should we remove IndexFileSuffix in LocalLog now that it's moved to 
LogFileUtils?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java:
##
@@ -72,4 +92,99 @@ private static String filenamePrefixFromOffset(long offset) {
 return nf.format(offset);
 }
 
+/**
+ * Construct a log file name in the given dir with the given base offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File logFile(File dir, long offset) {
+return logFile(dir, offset, "");
+}
+
+/**
+ * Construct a log file name in the given dir with the given base offset 
and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name (e.g. "", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File logFile(File dir, long offset, String suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
LOG_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Construct an index file name in the given dir using the given base 
offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File offsetIndexFile(File dir, long offset) {
+return offsetIndexFile(dir, offset, "");
+}
+
+/**
+ * Construct an index file name in the given dir using the given base 
offset and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name ("", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File offsetIndexFile(File dir, long offset, String suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
INDEX_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Construct a time index file name in the given dir using the given base 
offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File timeIndexFile(File dir, long offset) {
+return timeIndexFile(dir, offset, "");
+}
+
+/**
+ * Construct a time index file name in the given dir using the given base 
offset and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name ("", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File timeIndexFile(File dir, long offset, String suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
TIME_INDEX_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Construct a transaction index file name in the given dir using the 
given base offset.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+public static File transactionIndexFile(File dir, long offset) {
+return transactionIndexFile(dir, offset, "");
+}
+
+/**
+ * Construct a transaction index file name in the given dir using the 
given base offset and the given suffix.
+ *
+ * @param dirThe directory in which the log will reside
+ * @param offset The base offset of the log file
+ * @param suffix The suffix to be appended to the file name ("", 
".deleted", ".cleaned", ".swap", etc.)
+ */
+public static File transactionIndexFile(File dir, long offset, String 
suffix) {
+return new File(dir, filenamePrefixFromOffset(offset) + 
TXN_INDEX_FILE_SUFFIX + suffix);
+}
+
+/**
+ * Returns the offset from the given file. The file name is of the form: 
{number}.{suffix}. This method extracts
+ * the number from the given file's name.
+ *
+ * @param file file with the offset information as par

[GitHub] [kafka] gharris1727 commented on pull request #13302: KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new test-plugins module

2023-02-27 Thread via GitHub


gharris1727 commented on PR #13302:
URL: https://github.com/apache/kafka/pull/13302#issuecomment-1446847150

   > I think this may still require a KIP as it removes connectors from OOTB 
Connect.
   
   This is included as part of KIP-898: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery
 and I think we can block this PR from landing until the KIP-898 feature is 
approved.
   
   > If you've identified anything like that, please let us know by emailing 
[secur...@kafka.apache.org](mailto:secur...@kafka.apache.org).
   
   I am not currently aware of any exploits that leverage these connectors and 
would justify their removal like the File stream connectors. I'll make sure to 
properly report any exploitable flaws that I notice via the mailing list, 
thanks for pointing me in that direction!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14762) Remove 0_8_2_LATEST from rolling upgrade system test

2023-02-27 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694124#comment-17694124
 ] 

Greg Harris commented on KAFKA-14762:
-

Thanks [~ijuma] for clarifying where the actual policy diverges from the 
Release Plan policy.

I think this change can be made after KIP-896 is approved.

> Remove 0_8_2_LATEST from rolling upgrade system test
> 
>
> Key: KAFKA-14762
> URL: https://issues.apache.org/jira/browse/KAFKA-14762
> Project: Kafka
>  Issue Type: Task
>  Components: system tests
>Reporter: Greg Harris
>Priority: Minor
>
> Currently, the core/upgrade_test.py exercises rolling from various past Kafka 
> versions to the development version. The earliest version that it currently 
> tests is 0_8_2_LATEST, 0.8.2.2, released October 2, 2015.
> The test has a special case for that that version, which does not contain a 
> copy of the `tools` jar. The test is written to make use of the tools jar for 
> assertions, but because the jar does not exist in 0.8.2.2, a jar from the 
> latest development version is used instead. For example, this has the effect 
> that when executing the 0.8.2.2 upgrade case, the 3.5.0-SNAPSHOT tools jar is 
> on the classpath with the 0.8.2.2 clients jar.
> Because of this, development on the VerifiableProducer has needed to be 
> backwards compatible with the 0.8.2.2 clients jar, and this has lead to code 
> duplication and other limitations on the maintenance that can be done to the 
> class. This appears to be mostly an artifact of how the testing is carried 
> out, as upgrades are typically performed without mixing jars from different 
> versions of Kafka.
> In order to lift those limitations, we should eliminate this one version from 
> compatibility testing. Accompanying this change, we should update the latest 
> documentation to say "Upgrading to  from any version 0.9.x through 
> " instead of 0.8.x, since that version will no longer be regularly 
> tested.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13302: KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new test-plugins module

2023-02-27 Thread via GitHub


C0urante commented on PR #13302:
URL: https://github.com/apache/kafka/pull/13302#issuecomment-1446859760

   Ah yes, good point about KIP-898! Nice that we don't have to file a separate 
KIP for this change. Will revisit if/when KIP-898 passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2023-02-27 Thread via GitHub


C0urante commented on PR #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-1446878483

   Hi @urbandan, thanks for clarifying the point about using `Schema` methods. 
Agreed that this would be helpful.
   
   I think we should try to keep validation of optionality if possible, and 
right now I don't see a case where this really breaks things. It seems like if 
we tweak `ConnectSchema::equals` to use the `Schema` interface methods, that 
should be sufficient to retain this validation without too much work?
   
   I'd rather not change the API surface here since that would require a KIP 
and manual action on the part of developers in order to address what is 
ultimately a bug in the current API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14264) Refactor coordinator code

2023-02-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14264.
---
Resolution: Fixed

[~pnee] can you fill in the details on the fixed versions? Thanks!

> Refactor coordinator code
> -
>
> Key: KAFKA-14264
> URL: https://issues.apache.org/jira/browse/KAFKA-14264
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> To refactor the consumer, we changed how the coordinator is called.  However, 
> there will be a time period where the old and new implementation need to 
> coexist, so we will need to override some of the methods and create a new 
> implementation of the coordinator.  In particular:
>  # ensureCoordinatorReady needs to be non-blocking or we could just use the 
> sendFindCoordinatorRequest.
>  # joinGroupIfNeeded needs to be broken up into more find grain stages for 
> the new implementation to work.
> We also need to create the coordinator state machine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14468) Refactor Commit Logic

2023-02-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-14468.
---
Resolution: Fixed

[~pnee] can you fill in the details on the fixed versions? Thanks!

> Refactor Commit Logic
> -
>
> Key: KAFKA-14468
> URL: https://issues.apache.org/jira/browse/KAFKA-14468
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14468) Refactor Commit Logic

2023-02-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14468:
---
Fix Version/s: 3.5.0

> Refactor Commit Logic
> -
>
> Key: KAFKA-14468
> URL: https://issues.apache.org/jira/browse/KAFKA-14468
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


C0urante commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1097934299


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   Is this clarification necessary? The description already includes the phrase 
"records output from the transformations and written to Kafka".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14468) Refactor Commit Logic

2023-02-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14468:
---
Fix Version/s: (was: 3.5.0)

> Refactor Commit Logic
> -
>
> Key: KAFKA-14468
> URL: https://issues.apache.org/jira/browse/KAFKA-14468
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14264) Refactor coordinator code

2023-02-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14264:
---
Fix Version/s: 3.4.0

> Refactor coordinator code
> -
>
> Key: KAFKA-14264
> URL: https://issues.apache.org/jira/browse/KAFKA-14264
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.4.0
>
>
> To refactor the consumer, we changed how the coordinator is called.  However, 
> there will be a time period where the old and new implementation need to 
> coexist, so we will need to override some of the methods and create a new 
> implementation of the coordinator.  In particular:
>  # ensureCoordinatorReady needs to be non-blocking or we could just use the 
> sendFindCoordinatorRequest.
>  # joinGroupIfNeeded needs to be broken up into more find grain stages for 
> the new implementation to work.
> We also need to create the coordinator state machine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14468) Refactor Commit Logic

2023-02-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14468:
---
Fix Version/s: 3.5.0

> Refactor Commit Logic
> -
>
> Key: KAFKA-14468
> URL: https://issues.apache.org/jira/browse/KAFKA-14468
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.5.0
>
>
> Refactor commit logic using the new multi-threaded coordinator construct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14748) Relax non-null FK left-join requirement

2023-02-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694147#comment-17694147
 ] 

Guozhang Wang commented on KAFKA-14748:
---

Originally I think this can be treated without a KIP, just as a fix in join 
semantics. But when I think again I realized it may not be the case, primarily 
because in that case we cannot distinguish the following two cases:

1) Key extractor returns non-null `K0`, and then found a matching record for 
`K0` with a null `V0`, resulting in ``.

2) Key extractor returns null `K0`, and hence we directly result in ``.

Hence, adding a `filter` operator after the `join` operator alone for `` cannot preserve the old behavior if a developer really wants 
that..

In fact, the same question applies for the general issue of 
https://issues.apache.org/jira/browse/KAFKA-12317 as well: should we try to 
distinguish between the case of extracting a null key for the join, v.s. a case 
where non-null extracted key did not found a matching record on the other 
relation (or more specifically, the other relation returns a null value with 
the extracted key).

My thoughts about the above question are as follows: put performance benefits 
aside, for app semantics where the developers knows there are certain keys in 
the other relation which would never exist (i.e. would always return a null 
value), then developer could let the key extractor to return those keys when 
they want to return no-matching join results; that means, the value of 
KAFKA-12317/KAFKA-14748 would be when the developer does not know any keys in 
the other relations that would never exist.

If we want to change to that behavior which would not distinguish these two 
cases, I'd suggest we add a flag config to enable this behavior across 
fk/out/left joins, and to remove it (i.e. always enable it) when we did not 
hear people complain about the behavior change for a while. But this would 
result in a KIP..

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang merged pull request #13308: MINOR: update docs of 'replica.socket.receive.buffer.bytes'

2023-02-27 Thread via GitHub


guozhangwang merged PR #13308:
URL: https://github.com/apache/kafka/pull/13308


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


hgeraldino commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1119214853


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   Fair question. The `source-record-write-rate` template declared just above  
(L181) includes this text, so I thought about adding it here as well to 
maintain uniformity between the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #13308: MINOR: update docs of 'replica.socket.receive.buffer.bytes'

2023-02-27 Thread via GitHub


guozhangwang commented on PR #13308:
URL: https://github.com/apache/kafka/pull/13308#issuecomment-1446938946

   LGTM. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


C0urante commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1119219553


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   Ah yeah, uniformity is probably worth striving for here. IMO adding this 
sentence makes some of language earlier in the description redundant; WDYT 
about changing the descriptions of both to start with "The [average per-second] 
number of records written to Kafka for this"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


C0urante commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1119219553


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   Ah yeah, uniformity is probably worth striving for here. IMO adding this 
sentence makes some of language earlier in the description redundant; WDYT 
about changing the descriptions of both to start with "The [average per-second] 
number of records written to Kafka for this" and dropping the "output from the 
transformations and" phrase altogether?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


hgeraldino commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1119228442


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   Ok, how about something like: 
   
   `
   The average per-second number of records written to Kafka for this task 
belonging to the named source connector in this worker. This is after 
transformations are applied, and excludes any records filtered out by the 
transformations.
   `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2023-02-27 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694156#comment-17694156
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-2967:
-

Thanks [~mimaison]! Haven't tried it, but now that docs are in markdown it 
should be straightforward to use something like pandoc to migrate them to 
asciidoc.

I picked Markdown based on the latest comments that seem to agree using it, and 
asciidoc interest sparked on 2018 without much follow up. Now that there is a 
tool that support both may be worth revisiting this decision. 

We already use markdown within the project for readmes and so, so adding 
another markup would need to be weighed if it really adds value. 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13240: KAFKA-14690: Add topic IDs to OffsetCommit API and propagate for request version >= 9

2023-02-27 Thread via GitHub


dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1119248386


##
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##
@@ -47,8 +49,10 @@
   "about": "The time period in ms to retain the offset." },
 { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
   "about": "The topics to commit offsets for.",  "fields": [
-  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+  { "name": "Name", "type": "string", "versions": "0+", 
"nullableVersions": "9+", "entityType": "topicName",
 "about": "The topic name." },
+  { "name": "TopicId", "type": "uuid", "versions": "9+",
+"about": "The unique topic ID." },

Review Comment:
   @Hangleton I discussed offline with a few committers and the consensus is 
that having both the topic name and the topic id in the same version is not the 
right way. They share the same concerns that we discussed last week. Could you 
update the PR to only have TopicId from version 9? We can also remove the 
nullableVersions for the Name and set the versions to 0-8. I suppose that both 
fields could be ignorable.
   
   Regarding the admin client, which does not support topic ids, it cannot use 
version 9 at the moment. We need to handle this in the Builder (we can set the 
maximum allowed version).
   
   Sorry for this late change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13307: MINOR: Make ByteUtilsBenchmark deterministic

2023-02-27 Thread via GitHub


divijvaidya commented on PR #13307:
URL: https://github.com/apache/kafka/pull/13307#issuecomment-1447001588

   @showuon I am discarding this PR as I have included it's changes in 
https://github.com/apache/kafka/pull/13312 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya closed pull request #13307: MINOR: Make ByteUtilsBenchmark deterministic

2023-02-27 Thread via GitHub


divijvaidya closed pull request #13307: MINOR: Make ByteUtilsBenchmark 
deterministic
URL: https://github.com/apache/kafka/pull/13307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-02-27 Thread via GitHub


gharris1727 opened a new pull request, #13313:
URL: https://github.com/apache/kafka/pull/13313

   Currently connect-runtime pulls in tools as a dependency in order to use the 
ThroughputThrottler.
   In the future it will be desirable to have tools pull in connect-runtime as 
a (test) dependency, so we must remove this dependency first.
   
   The last time this was tried 
(https://issues.apache.org/jira/browse/KAFKA-2752) it broke system tests and 
had to be reverted (https://issues.apache.org/jira/browse/KAFKA-2807).
   
   This was because the `dev` version of tools was being used with `0.8.2.x` 
clients jar in the system tests, because the test needs the VerifiableProducer 
which did not exist in `0.8.2.x`. In order to work around this restriction, 
this PR changes the system test to use the `0.9.x` version of tools with 
`0.8.2.x` instead of the dev version. This will relax the current compatibility 
requirement between tools and clients, and should allow this refactor to land 
without the upgrade test failing.
   
   This conflicts with #13302 which move some of the classes which depend on 
`tools`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


C0urante commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1119262008


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records

2023-02-27 Thread via GitHub


hgeraldino commented on code in PR #13193:
URL: https://github.com/apache/kafka/pull/13193#discussion_r1119279751


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java:
##
@@ -187,7 +187,8 @@ public ConnectMetricsRegistry(Set tags) {
 sourceRecordWriteTotal = createTemplate("source-record-write-total", 
SOURCE_TASK_GROUP_NAME,
 "The number of records output 
from the transformations and written to Kafka for this" +
 " task belonging to the named 
source connector in this worker, since the task was " +
-"last restarted.",
+"last restarted. This is after 
transformations are applied and excludes any records " +
+"filtered out by the 
transformations.",

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13291: KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix flakey OOMEs

2023-02-27 Thread via GitHub


gharris1727 commented on PR #13291:
URL: https://github.com/apache/kafka/pull/13291#issuecomment-1447059390

   @edoardocomar 
   
   I pulled these values out into three separate constants with descriptive 
names, let me know if this is closer to what you had in mind!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-27 Thread via GitHub


C0urante commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119281474


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##
@@ -16,63 +16,58 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when 
a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link 
Predicate }
+ * which applies the transformation when the {@link Predicate} is true (or 
false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be 
unconditionally applied.
  * @param  The type of record (must be an implementation of {@link 
ConnectRecord})
  */
-public class PredicatedTransformation> implements 
Transformation {
+public class TransformationStage> implements 
AutoCloseable {
 
 static final String PREDICATE_CONFIG = "predicate";
 static final String NEGATE_CONFIG = "negate";
-final Predicate predicate;
-final Transformation delegate;
-final boolean negate;
+private final Predicate predicate;
+private final Transformation delegate;

Review Comment:
   Nit: this isn't a delegate anymore; maybe just `transform` or 
`transformation`?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
 }
 
 /**
- * Returns the initialized list of {@link Transformation} which are 
specified in {@link #TRANSFORMS_CONFIG}.
+ * Returns the initialized list of {@link TransformationStage} which are 
specified in {@link #TRANSFORMS_CONFIG}.
  */
-public > List> 
transformations() {
+public > List> 
transformationStages() {
 final List transformAliases = getList(TRANSFORMS_CONFIG);
 
-final List> transformations = new 
ArrayList<>(transformAliases.size());
+final List> transformations = new 
ArrayList<>(transformAliases.size());
 for (String alias : transformAliases) {
 final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
 try {
 @SuppressWarnings("unchecked")
 final Transformation transformation = 
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
 Map configs = originalsWithPrefix(prefix);
-Object predicateAlias = 
configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-Object negate = 
configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
+Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
 transformation.configure(configs);
 if (predicateAlias != null) {
 String predicatePrefix = PREDICATES_PREFIX + 
predicateAlias + ".";
 @SuppressWarnings("unchecked")
 Predicate predicate = 
Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
 predicate.configure(originalsWithPrefix(predicatePrefix));
-transformations.add(new 
PredicatedTransformation<>(predicate, negate == null ? false : 
Boolean.parseBoolean(negate.toString()), transformation));
+transformations.add(new TransformationStage<>(predicate, 
negate != null && Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   I know IntelliJ loves to suggest "simplifying" boolean expressions by 
replacing uses of the ternary operator with `&&` but in cases like this I find 
the former significantly more readable.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##
@@ -16,63 +16,58 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when 
a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link 
Pred

[jira] [Commented] (KAFKA-14757) Kafka Cooperative Sticky Assignor results in significant duplicate consumption

2023-02-27 Thread Siddharth Anand (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694182#comment-17694182
 ] 

Siddharth Anand commented on KAFKA-14757:
-

We follow the pattern below in each consumer:
 * Read a batch records from a partition
 * Process the batch of records, which includes persisting them to a data store
 * Manually committing the offset, & repeat these steps

If some partitions are reassigned, how could this result in loss? I expect a 
(small) amount of potential reconsumption when a partition is reassigned, but I 
don't see any loss and we don't  implement onPartitionRevoked() today.
 

> Kafka Cooperative Sticky Assignor results in significant duplicate consumption
> --
>
> Key: KAFKA-14757
> URL: https://issues.apache.org/jira/browse/KAFKA-14757
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.1.1
> Environment: AWS MSK (broker) and Spring Kafka (2.8.7) for use in 
> Spring Boot consumers.
>Reporter: Siddharth Anand
>Priority: Critical
>
> Details may be found within the linked document:
> [Kafka Cooperative Sticky Assignor Issue : Duplicate Consumption | 
> [https://docs.google.com/document/d/1E7qAwGOpF8jo_YhF4NwUx9CXxUGJmT8OhHEqIg7-GfI/edit?usp=sharing]]
> In a nutshell, we noticed that the Cooperative Sticky Assignor resulted in 
> significant duplicate message consumption. During last year's F1 Grand Prix 
> events and World Cup soccer events, our company's Kafka-based platform 
> received live-traffic. This live traffic, coupled with autoscaled consumers 
> resulted in as much as 70% duplicate message consumption at the Kafka 
> consumers. 
> In December 2022, we ran a synthetic load test to confirm that duplicate 
> message consumption occurs during consumer scale out/in and Kafka partition 
> rebalancing when using the Cooperative Sticky Assignor. This issue does not 
> occur when using the Range Assignor.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rittikaadhikari opened a new pull request, #13314: [DO NOT MERGE] Sync apache/trunk to confluentinc/kafka master 27-Feb-2023

2023-02-27 Thread via GitHub


rittikaadhikari opened a new pull request, #13314:
URL: https://github.com/apache/kafka/pull/13314

   Conflicts in Jenkinsfile and build.gradle


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari closed pull request #13314: [DO NOT MERGE] Sync apache/trunk to confluentinc/kafka master 27-Feb-2023

2023-02-27 Thread via GitHub


rittikaadhikari closed pull request #13314: [DO NOT MERGE] Sync apache/trunk to 
confluentinc/kafka master 27-Feb-2023
URL: https://github.com/apache/kafka/pull/13314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14767) Gradle build fails with missing commitId after git gc

2023-02-27 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14767:
---

 Summary: Gradle build fails with missing commitId after git gc
 Key: KAFKA-14767
 URL: https://issues.apache.org/jira/browse/KAFKA-14767
 Project: Kafka
  Issue Type: Bug
  Components: build
Reporter: Greg Harris


Reproduction steps:
1. `git gc`
2. `./gradlew jar`

Expected behavior: build completes successfully (or shows other build errors)
Actual behavior:
{noformat}
Task failed with an exception.
---
* What went wrong:
A problem was found with the configuration of task ':storage:createVersionFile' 
(type 'DefaultTask').
  - Property 'commitId' doesn't have a configured value.
    
    Reason: This property isn't marked as optional and no value has been 
configured.
    
    Possible solutions:
      1. Assign a value to 'commitId'.
      2. Mark property 'commitId' as optional.
    
    Please refer to 
https://docs.gradle.org/7.6/userguide/validation_problems.html#value_not_set 
for more details about this problem.{noformat}
This appears to be due to the fact that the build.gradle determineCommitId() 
function is unable to read the git commit hash for the current HEAD. This 
appears to happen after a `git gc` takes place, which causes the 
`.git/refs/heads/*` files to be moved to `.git/packed-refs`.

The determineCommitId() should be patched to also try reading from the 
packed-refs to determine the commit hash.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14767) Gradle build fails with missing commitId after git gc

2023-02-27 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694207#comment-17694207
 ] 

Greg Harris commented on KAFKA-14767:
-

This happens occasionally when git performs an automatic `gc`, and often causes 
build failures when I merge `trunk` into my development branches. My build 
usually won't complete until I finish the merge commit, which I may have to go 
back and amend if i introduced any build breakages.

I know of one workaround: Take the branch you're working on and move it to 
re-create the non-gc'd refs file:
{noformat}
brach=
git checkout -b placeholder
git branch -f $branch trunk
git branch -f $branch placeholder
git checkout $branch
git branch -D placeholder{noformat}

> Gradle build fails with missing commitId after git gc
> -
>
> Key: KAFKA-14767
> URL: https://issues.apache.org/jira/browse/KAFKA-14767
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Reporter: Greg Harris
>Priority: Minor
>
> Reproduction steps:
> 1. `git gc`
> 2. `./gradlew jar`
> Expected behavior: build completes successfully (or shows other build errors)
> Actual behavior:
> {noformat}
> Task failed with an exception.
> ---
> * What went wrong:
> A problem was found with the configuration of task 
> ':storage:createVersionFile' (type 'DefaultTask').
>   - Property 'commitId' doesn't have a configured value.
>     
>     Reason: This property isn't marked as optional and no value has been 
> configured.
>     
>     Possible solutions:
>       1. Assign a value to 'commitId'.
>       2. Mark property 'commitId' as optional.
>     
>     Please refer to 
> https://docs.gradle.org/7.6/userguide/validation_problems.html#value_not_set 
> for more details about this problem.{noformat}
> This appears to be due to the fact that the build.gradle determineCommitId() 
> function is unable to read the git commit hash for the current HEAD. This 
> appears to happen after a `git gc` takes place, which causes the 
> `.git/refs/heads/*` files to be moved to `.git/packed-refs`.
> The determineCommitId() should be patched to also try reading from the 
> packed-refs to determine the commit hash.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


guozhangwang commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119357976


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// continue to retry as long as the timer hasn't expired

Review Comment:
   Could we simplify this multi-if logic as:
   
   ```
   if (!future.isRetriable()) { throw }
   else {
   if (timer.isExpired() { return false }
   else if (exception instance of.. ) { continue}
   else {timer.sleep(..)}
   }
   ```
   
   Also could we add a comment on top clarifying that the order of precedence 
are deliberated in this order and future changes should pay attention to not 
change it unnecessarily.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
 client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.poll(time.timer(0)); // failing joinGroup request will 
require re-poll in order to retry

Review Comment:
   It's not very clear to me why here and line 3403 below we need additional 
polls since the test scenarios seems irrelevant to error cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-27 Thread via GitHub


gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119388653


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##
@@ -531,8 +543,17 @@ private Converter badConverter() {
 return converter;
 }
 
+private void mockSourceTransform() {
+FaultyPassthrough faultyPassthrough = new 
FaultyPassthrough<>();
+
doReturn(FaultyPassthrough.class).when(transformationStage).transformClass();
+when(transformationStage.apply(any())).thenAnswer(invocation -> 
faultyPassthrough.apply(invocation.getArgument(0)));
+}

Review Comment:
   I added this in preparation for a later commit, but I agree it doesn't make 
much sense in the context of this PR. I'll push this out to that other PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-27 Thread via GitHub


gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119389854


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##
@@ -268,30 +268,30 @@ public boolean includeRecordDetailsInErrorLog() {
 }
 
 /**
- * Returns the initialized list of {@link Transformation} which are 
specified in {@link #TRANSFORMS_CONFIG}.
+ * Returns the initialized list of {@link TransformationStage} which are 
specified in {@link #TRANSFORMS_CONFIG}.
  */
-public > List> 
transformations() {
+public > List> 
transformationStages() {
 final List transformAliases = getList(TRANSFORMS_CONFIG);
 
-final List> transformations = new 
ArrayList<>(transformAliases.size());
+final List> transformations = new 
ArrayList<>(transformAliases.size());
 for (String alias : transformAliases) {
 final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
 try {
 @SuppressWarnings("unchecked")
 final Transformation transformation = 
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
 Map configs = originalsWithPrefix(prefix);
-Object predicateAlias = 
configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-Object negate = 
configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
+Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
 transformation.configure(configs);
 if (predicateAlias != null) {
 String predicatePrefix = PREDICATES_PREFIX + 
predicateAlias + ".";
 @SuppressWarnings("unchecked")
 Predicate predicate = 
Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
 predicate.configure(originalsWithPrefix(predicatePrefix));
-transformations.add(new 
PredicatedTransformation<>(predicate, negate == null ? false : 
Boolean.parseBoolean(negate.toString()), transformation));
+transformations.add(new TransformationStage<>(predicate, 
negate != null && Boolean.parseBoolean(negate.toString()), transformation));

Review Comment:
   since there's a disagreement over this among reviewers, i'll leave this as 
it was on trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation

2023-02-27 Thread via GitHub


gharris1727 commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1119392537


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##
@@ -280,25 +283,18 @@ public void abstractPredicate() {
 assertTrue(e.getMessage().contains("Predicate is abstract and cannot 
be created"));
 }
 
-private void assertPredicatedTransform(Map props, boolean 
expectedNegated) {
+private void assertTransformationStageWithPredicate(Map 
props, boolean expectedNegated) {
 final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-final List> transformations = 
config.transformations();
-assertEquals(1, transformations.size());
-assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-PredicatedTransformation predicated = (PredicatedTransformation) 
transformations.get(0);
-
-assertEquals(expectedNegated, predicated.negate);
-
-assertTrue(predicated.delegate instanceof 
ConnectorConfigTest.SimpleTransformation);
-assertEquals(42, ((SimpleTransformation) 
predicated.delegate).magicNumber);
+final List> transformationStages = 
config.transformationStages();
+assertEquals(1, transformationStages.size());
+TransformationStage stage = transformationStages.get(0);
 
-assertTrue(predicated.predicate instanceof 
ConnectorConfigTest.TestPredicate);
-assertEquals(84, ((TestPredicate) predicated.predicate).param);
+assertEquals(expectedNegated ? 42 : 0, 
stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
 
-predicated.close();
+SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, 
null, null, null, 0L);
+assertEquals(expectedNegated ? 84 : 42, 
stage.apply(matchingRecord).kafkaPartition().intValue());
 
-assertEquals(0, ((SimpleTransformation) 
predicated.delegate).magicNumber);
-assertEquals(0, ((TestPredicate) predicated.predicate).param);

Review Comment:
   The value in the TestPredicate also affects the outward behavior in the 
other assertions here. We aren't asserting that the field is set, but we are 
asserting that the configuration affects the behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119397792


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// continue to retry as long as the timer hasn't expired

Review Comment:
   I think the `instanceof ...` exceptions are also non-retriable, and I think 
they need to be handled first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119398074


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
 client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.poll(time.timer(0)); // failing joinGroup request will 
require re-poll in order to retry

Review Comment:
   good point... let me update the tests there. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119399381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// continue to retry as long as the timer hasn't expired

Review Comment:
   so the if else blocks becomes a bit fragmented.  or we could do (imo, a bit 
more difficult to read):
   ```
   if (!future.isRetriable()) {
 if ( ... instance of ... ) { continue; }
 throw ...
   }
   
   {rest of the logic there}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119399381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
 requestRejoin(shortReason, fullReason);
 }
 
+// continue to retry as long as the timer hasn't expired

Review Comment:
   so the if else blocks becomes a bit fragmented.  or we could do:
   ```
   if (!future.isRetriable()) {
 if ( ... instance of ... ) { continue; }
 throw ...
   }
   
   {rest of the logic there}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119398074


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
 client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.poll(time.timer(0)); // failing joinGroup request will 
require re-poll in order to retry

Review Comment:
   good point... let me update the tests there. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2023-02-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694213#comment-17694213
 ] 

Guozhang Wang commented on KAFKA-14747:
---

Echoing that, I think we can piggy-back on the existing `dropped-records`, as 
it has also been replacing other old sensors like `expired-window-record-drop` 
as well in KIP-743.

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping

2023-02-27 Thread via GitHub


philipnee commented on code in PR #13190:
URL: https://github.com/apache/kafka/pull/13190#discussion_r1119422842


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1;
 client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
 client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.poll(time.timer(0)); // failing joinGroup request will 
require re-poll in order to retry

Review Comment:
   The NOT_COORDINATOR error originally should trigger retries; however, in the 
new code, it would exit due to an expired timer. Another way to do it is using 
poll(time.timer(1))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >