[GitHub] [kafka] dengziming commented on pull request #11910: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode

2022-03-17 Thread GitBox


dengziming commented on pull request #11910:
URL: https://github.com/apache/kafka/pull/11910#issuecomment-1072029922


   cc @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink

2022-03-17 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508566#comment-17508566
 ] 

Anil Dasari edited comment on KAFKA-13601 at 3/18/22, 4:52 AM:
---

Filed partitioner uses starting offset of the batch in the file name and is the 
only varying parameter. There will be only one parquet file per worker 
(consumer/partition) (that is out of sync with offsets) present in destination 
(s3 in my case) if worker dies before committing an offset. So new or restarted 
worker would override that parquet file as start offset of the partition remans 
same. 

Please let me know if you have any questions. 


was (Author: JIRAUSER283879):
File name in the filed partitioner uses starting offset of the batch and there 
will be only one parquet file per worker (consumer/partition) present in 
destination (s3 in my case) if worker dies before committing an offset. So new 
or restarted worker would override that parquet file as start offset of the 
partition remans same. 

Please let me know if you have any questions. 

> Add option to support sync offset commit in Kafka Connect Sink
> --
>
> Key: KAFKA-13601
> URL: https://issues.apache.org/jira/browse/KAFKA-13601
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Anil Dasari
>Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit after message batch flushed 
> to sink successfully
> Currently, WorkerSinkTask committing the consumer offsets asynchronously and 
> at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG 
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]
>  
> Add config to allow user to select synchronous commit over 
> WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink

2022-03-17 Thread Anil Dasari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508566#comment-17508566
 ] 

Anil Dasari commented on KAFKA-13601:
-

File name in the filed partitioner uses starting offset of the batch and there 
will be only one parquet file per worker (consumer/partition) present in 
destination (s3 in my case) if worker dies before committing an offset. So new 
or restarted worker would override that parquet file as start offset of the 
partition remans same. 

Please let me know if you have any questions. 

> Add option to support sync offset commit in Kafka Connect Sink
> --
>
> Key: KAFKA-13601
> URL: https://issues.apache.org/jira/browse/KAFKA-13601
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Anil Dasari
>Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit after message batch flushed 
> to sink successfully
> Currently, WorkerSinkTask committing the consumer offsets asynchronously and 
> at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG 
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]
>  
> Add config to allow user to select synchronous commit over 
> WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guizmaii closed pull request #11904: MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC`

2022-03-17 Thread GitBox


guizmaii closed pull request #11904:
URL: https://github.com/apache/kafka/pull/11904


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guizmaii commented on pull request #11904: MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC`

2022-03-17 Thread GitBox


guizmaii commented on pull request #11904:
URL: https://github.com/apache/kafka/pull/11904#issuecomment-1071947794


   @dajac I opened a PR on trunk here: 
https://github.com/apache/kafka/pull/11915


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable

2022-03-17 Thread GitBox


showuon commented on pull request #11905:
URL: https://github.com/apache/kafka/pull/11905#issuecomment-1071946919


   @tchiotludo , thanks for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11905: MINOR: Fix incorrect log for out-of-order KTable

2022-03-17 Thread GitBox


showuon merged pull request #11905:
URL: https://github.com/apache/kafka/pull/11905


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable

2022-03-17 Thread GitBox


showuon commented on pull request #11905:
URL: https://github.com/apache/kafka/pull/11905#issuecomment-1071946079


   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
kafka.server.LogDirFailureTest.testReplicaFetcherThreadAfterLogDirFailureOnFollower()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testSwitchingToTopicCreationEnabled
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2022-03-17 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508513#comment-17508513
 ] 

Luke Chen commented on KAFKA-7077:
--

Everything goes with idempotent producer by default after v3.0.0 (or more 
specifically, it's v3.0.1). And this PR 
[https://github.com/apache/kafka/pull/11475]  remove some constraint for it. 
Close it now.

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
> Fix For: 3.2.0
>
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2022-03-17 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-7077.
--
Resolution: Fixed

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
> Fix For: 3.2.0
>
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-7077) KIP-318: Make Kafka Connect Source idempotent

2022-03-17 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-7077:
-
Fix Version/s: 3.2.0

> KIP-318: Make Kafka Connect Source idempotent
> -
>
> Key: KAFKA-7077
> URL: https://issues.apache.org/jira/browse/KAFKA-7077
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Stephane Maarek
>Assignee: Stephane Maarek
>Priority: Major
> Fix For: 3.2.0
>
>
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13742) Quota byte-rate/request metrics are loaded only when at least one quota is register

2022-03-17 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-13742.
--
Resolution: Not A Problem

Given that I got a better understanding of how quota metrics work 
(https://issues.apache.org/jira/browse/KAFKA-13744), I will close this one.

The only comment I may add is that there are no metrics at the moment to match 
client/users with topic/partitions. This information is only captured on the 
client-side as far as I know. I found quota metrics as a good proxy to get this 
mapping, though is still incomplete as the map to the topic is implicit at the 
moment on the broker-side.

Would be a nice addition to have this mapping available, but it should be 
discussed in another issue if there's interest in that.

> Quota byte-rate/request metrics are loaded only when at least one quota is 
> register
> ---
>
> Key: KAFKA-13742
> URL: https://issues.apache.org/jira/browse/KAFKA-13742
> Project: Kafka
>  Issue Type: Bug
>  Components: core, metrics
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: quotas
>
> Quota metrics are loaded only when at least one quota is present:
>  * Metrics: 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L552-L563]
>  * Reporting when quotas are enabled: 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L249-L256]
>  * Quotas enabled: `def quotasEnabled: Boolean = quotaTypesEnabled != 
> QuotaTypes.NoQuotas`
> Even though throttling is specific for quotas, byte-rate/request per 
> user/client-id is a valid metric for any deployment.
>  
> The current workaround is to add _any_ quota, as this will enable metrics for 
> *all* client-id/users.
> If these metrics are captured for all clients regardless of the quotas 
> created, it would be a better experience to have a config to opt-in into 
> these metrics instead of creating meaningless quotas just to get these 
> metrics.
> For threshold metrics, it makes sense to me to enable them only when quotas 
> are enabled.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13744) Quota metric tags are inconsistent

2022-03-17 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-13744.
--
Resolution: Not A Problem

> Quota metric tags are inconsistent
> --
>
> Key: KAFKA-13744
> URL: https://issues.apache.org/jira/browse/KAFKA-13744
> Project: Kafka
>  Issue Type: Bug
>  Components: core, metrics
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: quotas
> Attachments: image-2022-03-15-16-57-12-583.png
>
>
> When enabling metrics for quotas the metrics apply to _all_ clients (see 
> https://issues.apache.org/jira/browse/KAFKA-13742).
> Though, the tags are calculated depending on the quotas registered and 
> applied to all clients: 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694]
> This causes different metric tags result depending on which quota is 
> registered first.
> For instance, if a quota is registered with userId and clientId, then metrics 
> are tagged with both, though if then a quota is registered with only tagged 
> with clientId, then all metrics are only tagged by clientId — even though 
> user principal is available.
> !image-2022-03-15-16-57-12-583.png|width=1034,height=415!
> I managed to reproduce this behavior here:
>  * From 10:30 to 10:45, there was a quota with both client-id and user-id
>  * It was removed by 10:45, so no metrics were exposed.
>  * After, a quota with client id was created, and metrics were collected only 
> with client id, even though the user was available.
> I'd expect metrics to always contain both, if available — and simplify the 
> logic here 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694].



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13744) Quota metric tags are inconsistent

2022-03-17 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508473#comment-17508473
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-13744:
--

Thanks, [~junrao]! 

I think I got it now after looking deeper into the code and reading the KIPs.
So, depending on the type of quotas defined, if all of them are of the same 
type, then all the clients are tagged with that type:

 

```
        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
          ("", clientId)
        case QuotaTypes.UserQuotaEnabled =>
          (sanitizedUser, "")
        case QuotaTypes.UserClientIdQuotaEnabled =>
          (sanitizedUser, clientId)

```


But, as soon as there is a mix of types, that leads to evaluate each client 
against the quotas registered:

 

```
        case _ =>
          val userEntity = Some(UserEntity(sanitizedUser))
          val clientIdEntity = Some(ClientIdEntity(clientId))

          var metricTags = (sanitizedUser, clientId)
          // 1) /config/users//clients/
          if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, 
clientIdEntity))) {
            // 2) /config/users//clients/
            metricTags = (sanitizedUser, clientId)
            if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, 
Some(DefaultClientIdEntity {
              // 3) /config/users/
              metricTags = (sanitizedUser, "")
              if (!overriddenQuotas.containsKey(KafkaQuotaEntity(userEntity, 
None))) {
                // 4) /config/users//clients/
                metricTags = (sanitizedUser, clientId)
                if 
(!overriddenQuotas.containsKey(KafkaQuotaEntity(Some(DefaultUserEntity), 
clientIdEntity))) {
                  // 5) /config/users//clients/
                  metricTags = (sanitizedUser, clientId)
                  if 
(!overriddenQuotas.containsKey(DefaultUserClientIdQuotaEntity)) {
                    // 6) /config/users/
                    metricTags = (sanitizedUser, "")
                    if (!overriddenQuotas.containsKey(DefaultUserQuotaEntity)) {
                      // 7) /config/clients/
                      // 8) /config/clients/
                      metricTags = ("", clientId)
                    }
                  }
                }
              }
            }
          }

```


If the client doesn’t match, then it falls back to the client id. If there’s 
any quota for that client, or default, that matches, it applies the tagging 
depending on the match.

This now explains why I'm seeing different behavior depending on what quota I 
define first.

> Quota metric tags are inconsistent
> --
>
> Key: KAFKA-13744
> URL: https://issues.apache.org/jira/browse/KAFKA-13744
> Project: Kafka
>  Issue Type: Bug
>  Components: core, metrics
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: quotas
> Attachments: image-2022-03-15-16-57-12-583.png
>
>
> When enabling metrics for quotas the metrics apply to _all_ clients (see 
> https://issues.apache.org/jira/browse/KAFKA-13742).
> Though, the tags are calculated depending on the quotas registered and 
> applied to all clients: 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694]
> This causes different metric tags result depending on which quota is 
> registered first.
> For instance, if a quota is registered with userId and clientId, then metrics 
> are tagged with both, though if then a quota is registered with only tagged 
> with clientId, then all metrics are only tagged by clientId — even though 
> user principal is available.
> !image-2022-03-15-16-57-12-583.png|width=1034,height=415!
> I managed to reproduce this behavior here:
>  * From 10:30 to 10:45, there was a quota with both client-id and user-id
>  * It was removed by 10:45, so no metrics were exposed.
>  * After, a quota with client id was created, and metrics were collected only 
> with client id, even though the user was available.
> I'd expect metrics to always contain both, if available — and simplify the 
> logic here 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694].



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829589585



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
+
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
+}
 
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+if (election == Election.PREFERRED) {
+return electPreferredLeader();
+}
 
-return false;
+return electAnyLeader();
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+/**
+ * Assumes that the election type is Election.PREFERRED
+ */
+private ElectionResult electPreferredLeader() {
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
+
+Optional onlineLeader = targetReplicas.stream()

Review comment:
   Yes. I think you are correct based on how we use it in the replication 
control manager. Based on how we use it in the replication control manager, 
this code should always return at either line 151, line 156 or line 167.
   
   I would still like to keep this code as this code is attempting to keep the 
partition online.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829589585



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
+
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
+}
 
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+if (election == Election.PREFERRED) {
+return electPreferredLeader();
+}
 
-return false;
+return electAnyLeader();
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+/**
+ * Assumes that the election type is Election.PREFERRED
+ */
+private ElectionResult electPreferredLeader() {
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
+
+Optional onlineLeader = targetReplicas.stream()

Review comment:
   Yes. I think you are correct based on how we use it in the replication 
control manager. Based on how we use it replication control manager this code 
should always return at either line 151, line 156 or line 167.
   
   I would still like to keep this code as this code is attempting to keep the 
partition online.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-17 Thread GitBox


jolshan commented on pull request #11912:
URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071816320


   Ah yeah, that could be useful? I think it's less necessary since the server 
should be building the response, but doesn't hurt.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan removed a comment on pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-17 Thread GitBox


jolshan removed a comment on pull request #11912:
URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071812161


   I was thinking adding the test from the ticket and verifying it 
_**doesn't**_ return an error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-17 Thread GitBox


jolshan commented on pull request #11912:
URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071812161


   I was thinking adding the test from the ticket and verifying it 
_**doesn't**_ return an error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13753) Log cleaner should retain transaction metadata in index until corresponding marker is removed

2022-03-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13753:

Summary: Log cleaner should retain transaction metadata in index until 
corresponding marker is removed  (was: Log cleaner should transaction metadata 
in index until corresponding marker is removed)

> Log cleaner should retain transaction metadata in index until corresponding 
> marker is removed
> -
>
> Key: KAFKA-13753
> URL: https://issues.apache.org/jira/browse/KAFKA-13753
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Currently the log cleaner will remove aborted transactions from the index as 
> soon as it detects that the data from the transaction is gone. It does not 
> wait until the corresponding marker has also been removed. Although it is 
> extremely unlikely, it seems possible today that a Fetch might fail to return 
> the aborted transaction metadata correctly if a log cleaning occurs 
> concurrently. This is because the collection of aborted transactions is only 
> done after the reading data from the log. It would be safer to preserve the 
> aborted transaction metadata in the index until the marker is also removed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dengziming commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-17 Thread GitBox


dengziming commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-1071767117


   @dajac @showuon @jolshan Thank you for your patience. 🤝


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wsun-confluent commented on a change in pull request #11874: Fix typos in configuration docs

2022-03-17 Thread GitBox


wsun-confluent commented on a change in pull request #11874:
URL: https://github.com/apache/kafka/pull/11874#discussion_r829578541



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -216,8 +216,10 @@
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
 + 
" Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of"
 + 
" message re-ordering after a failed send due to retries (i.e., if retries are 
enabled)."
-+ 
" Additionally, enabling idempotence requires this config value to be less than 
or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
-+ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled.";
++ 
" Additionally, enabling idempotence requires the value of this configuration 
to be less than or equal to " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
++ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled. "
++ 
" Record ordering is preserved when enable.idempotence is set to 
true for idempotent "
++ 
" producer (or transactional producer), even when max in-flight requests are 
greater than 1 (supported up to 5).";

Review comment:
   Hello and thanks for reviewing!  The callout about record ordering was 
originally requested within Confluent; the same is mentioned at this page: 
https://developer.confluent.io/tutorials/message-ordering/kafka.html.  We think 
this would also benefit AK docs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


junrao commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829550657



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
+
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
+}
 
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+if (election == Election.PREFERRED) {
+return electPreferredLeader();
+}
 
-return false;
+return electAnyLeader();
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+/**
+ * Assumes that the election type is Election.PREFERRED
+ */
+private ElectionResult electPreferredLeader() {
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
+
+Optional onlineLeader = targetReplicas.stream()

Review comment:
   Preferred leader election is an optimization. If we can't move the 
leader to the preferred one, it seems there is no need to do anything extra.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,94 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
+
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
+}
 
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+if (election == Election.PREFERRED) {
+return electPreferredLeader();
+}
 
-return false;
+return electAnyLeader();
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+/**
+ * Assumes that the election type is Election.PREFERRED
+ */
+private ElectionResult electPreferredLeader() {
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica))

[GitHub] [kafka] jolshan commented on a change in pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread GitBox


jolshan commented on a change in pull request #11909:
URL: https://github.com/apache/kafka/pull/11909#discussion_r829541377



##
File path: 
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
##
@@ -172,6 +174,13 @@ public static void main(String[] args) throws Exception {
 .dest("describeConfigsSupported")
 .metavar("DESCRIBE_CONFIGS_SUPPORTED")
 .help("Whether describeConfigs is supported in the AdminClient.");
+parser.addArgument("--idempotent-producer-supported")
+.action(store())
+.required(true)
+.type(Boolean.class)
+.dest("idempotentProducerSupported")
+.metavar("IDEMPOTENT_PRODUCER_SUPPORTED")
+.help("Whether the producer supports idempotency.");

Review comment:
   Ah. How did I miss this 🤦‍♀️
   Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829532685



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
-
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
 
-return false;
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+// 1. Check if the election is not PREFERRED and we already have a 
valid leader
+if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+// 2. Attempt preferred replica election
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
+
+// 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {

Review comment:
   @junrao how about this algorithm: 
https://github.com/apache/kafka/pull/11893/commits/4ef0b161c2f44933139989ec7e680f2c05839cc5
   
   Is it easier to read and understand?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829522253



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() {
 queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
 }
 
+private static final String MAYBE_BALANCE_PARTITION_LEADERS = 
"maybeBalancePartitionLeaders";
+
+private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+private void maybeScheduleNextBalancePartitionLeaders() {
+if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+leaderImbalanceCheckIntervalNs.isPresent() &&
+replicationControl.arePartitionLeadersImbalanced()) {
+
+log.debug(
+"Scheduling write event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+MAYBE_BALANCE_PARTITION_LEADERS,
+imbalancedScheduled,
+leaderImbalanceCheckIntervalNs,
+replicationControl.arePartitionLeadersImbalanced()
+);
+
+ControllerWriteEvent event = new 
ControllerWriteEvent<>(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
+ControllerResult result = 
replicationControl.maybeBalancePartitionLeaders();
+
+// reschedule the operation after the 
leaderImbalanceCheckIntervalNs interval.
+// Mark the imbalance event as completed and reschedule if 
necessary
+if (result.response()) {
+imbalancedScheduled = ImbalanceSchedule.IMMEDIATELY;
+} else {
+imbalancedScheduled = ImbalanceSchedule.DEFERRED;
+}
+
+// Note that rescheduling this event here is not required 
because MAYBE_BALANCE_PARTITION_LEADERS
+// is a ControllerWriteEvent. ControllerWriteEvent always 
calls this method after the records
+// generated by a ControllerWriteEvent have been applied.
+
+return result;
+});
+
+long delayNs = time.nanoseconds();
+if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
+delay += leaderImbalanceCheckIntervalNs.getAsLong();
+}
+
+queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, new 
EarliestDeadlineFunction(delayNs), event);
+
+imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
+}
+}
+
+private void cancelMaybeBalancePartitionLeaders() {
+queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);

Review comment:
   Yes. Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829521940



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
-
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
 
-return false;
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+// 1. Check if the election is not PREFERRED and we already have a 
valid leader
+if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+// 2. Attempt preferred replica election
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
+
+// 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
+
+// 4. Attempt to keep the partition online based on the ISR
+Optional bestLeader = targetReplicas.stream()
+.skip(1)
+.filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+.findFirst();
+if (bestLeader.isPresent()) {
+return new ElectionResult(bestLeader.get(), false);
+}
+
+if (election == Election.UNCLEAN) {
+// 5. Attempt unclean leader election
+Optional uncleanLeader = targetReplicas.stream()
+.filter(replica -> isAcceptableLeader.apply(replica))
+.findFirst();
+if (uncleanLeader.isPresent()) {
+return new ElectionResult(uncleanLeader.get(), true);
 }
-this.node = NO_LEADER;
-this.unclean = false;
 }
+
+return new ElectionResult(NO_LEADER, false);
 }
 
 private void tryElection(PartitionChangeRecord record) {
-BestLeader bestLeader = new BestLeader();
-if (bestLeader.node != partition.leader) {
-log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, bestLeader.node);
-record.setLeader(bestLeader.node);
-if (bestLeader.unclean) {
+ElectionResult electionResult = electLeader();
+if (electionResult.node != partition.leader) {
+log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, electionResult.node);
+record.setLeader(electionResult.node);
+if (electionResult.unclean) {
 // If the election was unclean, we have to forcibly set the 
ISR to just the
 // new leader. This can re

[GitHub] [kafka] tchiotludo commented on pull request #11905: MINOR: Fix incorrect log for out-of-order KTable

2022-03-17 Thread GitBox


tchiotludo commented on pull request #11905:
URL: https://github.com/apache/kafka/pull/11905#issuecomment-1071509762


   @showuon must be done I think ;) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13460) Issue reporting

2022-03-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-13460:
--
Component/s: (was: KafkaConnect)

> Issue reporting
> ---
>
> Key: KAFKA-13460
> URL: https://issues.apache.org/jira/browse/KAFKA-13460
> Project: Kafka
>  Issue Type: Wish
>Affects Versions: 1.1.1
>Reporter: Mikolaj Ryll
>Priority: Critical
> Fix For: 1.0.3
>
>
> I would like to be able to report issue using github. Plx.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13601) Add option to support sync offset commit in Kafka Connect Sink

2022-03-17 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508423#comment-17508423
 ] 

Chris Egerton commented on KAFKA-13601:
---

[~dasarianil] I don't see how synchronous offset commits would guarantee 
exactly once. What if the worker dies in between the task writing data and its 
consumer committing an offset?

> Add option to support sync offset commit in Kafka Connect Sink
> --
>
> Key: KAFKA-13601
> URL: https://issues.apache.org/jira/browse/KAFKA-13601
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Anil Dasari
>Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner 
> can be achieved with consumer offset sync' commit after message batch flushed 
> to sink successfully
> Currently, WorkerSinkTask committing the consumer offsets asynchronously and 
> at regular intervals of WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG 
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L203]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L196]
> [https://github.com/apache/kafka/blob/371f14c3c12d2e341ac96bd52393b43a10acfa84/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L354]
>  
> Add config to allow user to select synchronous commit over 
> WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-17 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508386#comment-17508386
 ] 

John Roesler edited comment on KAFKA-13714 at 3/17/22, 7:32 PM:


Another local repro:

 
{code:java}
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > 
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED
    java.lang.AssertionError: Result:StateQueryResult{partitionResults={

0=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946,
 
executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 1165952ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns
], 
position=Position{position={input-topic={0=1, 

1=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc,

executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 116767ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns
],
position=Position{position={input-topic={1=1}, 

globalResult=null}
    Expected: is <[1, 2, 3]>
         but: was <[1, 2]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 

{code}
 

logs:
{code:java}
[2022-03-17 07:31:56,286] INFO stream-client 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138]
 Kafka Streams version: test-version (org.apache.kafka.streams.KafkaStreams:912)
[2022-03-17 07:31:56,286] INFO stream-client 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138]
 Kafka Streams commit ID: test-commit-ID 
(org.apache.kafka.streams.KafkaStreams:913)
[2022-03-17 07:31:56,288] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1]
 Creating restore consumer client 
(org.apache.kafka.streams.processor.internals.StreamThread:346)
[2022-03-17 07:31:56,295] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1]
 Creating thread producer client 
(org.apache.kafka.streams.processor.internals.StreamThread:105)
[2022-03-17 07:31:56,297] INFO [Producer 
clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer]
 Instantiated an idempotent producer. 
(org.apache.kafka.clients.producer.KafkaProducer:532)
[2022-03-17 07:31:56,304] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1]
 Creating consumer client 
(org.apache.kafka.streams.processor.internals.StreamThread:397)
[2022-03-17 07:31:56,308] INFO stream-thread 
[app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-consumer]
 Cooperative rebalancing protocol is enabled now 
(org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration:126)
[2022-03-17 07:31:56,308] INFO [Producer 
clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer]
 Cluster ID: iZBZzURBQr6rMZEB6oxg7g (org.apache.kafka.clients.Metadata:287)
[2022-03-17 07:31:56,30

[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829419392



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
-
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
 
-return false;
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+// 1. Check if the election is not PREFERRED and we already have a 
valid leader
+if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+// 2. Attempt preferred replica election
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
+
+// 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
+
+// 4. Attempt to keep the partition online based on the ISR
+Optional bestLeader = targetReplicas.stream()
+.skip(1)
+.filter(replica -> targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica))
+.findFirst();
+if (bestLeader.isPresent()) {
+return new ElectionResult(bestLeader.get(), false);
+}
+
+if (election == Election.UNCLEAN) {
+// 5. Attempt unclean leader election
+Optional uncleanLeader = targetReplicas.stream()
+.filter(replica -> isAcceptableLeader.apply(replica))
+.findFirst();
+if (uncleanLeader.isPresent()) {
+return new ElectionResult(uncleanLeader.get(), true);
 }
-this.node = NO_LEADER;
-this.unclean = false;
 }
+
+return new ElectionResult(NO_LEADER, false);
 }
 
 private void tryElection(PartitionChangeRecord record) {
-BestLeader bestLeader = new BestLeader();
-if (bestLeader.node != partition.leader) {
-log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, bestLeader.node);
-record.setLeader(bestLeader.node);
-if (bestLeader.unclean) {
+ElectionResult electionResult = electLeader();
+if (electionResult.node != partition.leader) {
+log.debug("Setting new leader for topicId {}, partition {} to {}", 
topicId, partitionId, electionResult.node);
+record.setLeader(electionResult.node);
+if (electionResult.unclean) {
 // If the election was unclean, we have to forcibly set the 
ISR to just the
 // new leader. This can re

[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829414517



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
-
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
 
-return false;
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+// 1. Check if the election is not PREFERRED and we already have a 
valid leader
+if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+// 2. Attempt preferred replica election
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
+
+// 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
+
+// 4. Attempt to keep the partition online based on the ISR
+Optional bestLeader = targetReplicas.stream()
+.skip(1)

Review comment:
   We can skip the first replica because we already considered it in 2. or 
lines 144-148. This is a small optimization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


jsancio commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829411491



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
-
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
 
-return false;
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+// 1. Check if the election is not PREFERRED and we already have a 
valid leader
+if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+// 2. Attempt preferred replica election
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
+
+// 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {

Review comment:
   Yeah, this code may be difficult to read but I convinced myself that we 
need this. At this point we know this is true: preferred replica is 
unacceptable or not in the ISR.
   
   This check is saying that to minimize leader change we don't need to elect a 
new leader if the current leader is acceptable and in the ISR.
   
   Let me try to write an algorithm that is easier to read and see what we 
think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508389#comment-17508389
 ] 

Ismael Juma commented on KAFKA-13752:
-

What is the impact of this bug?

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest

2022-03-17 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508386#comment-17508386
 ] 

John Roesler commented on KAFKA-13714:
--

Another local repro:

 
{code:java}
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > 
verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED
    java.lang.AssertionError: Result:StateQueryResult{partitionResults={

0=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946,
 
executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 1165952ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns
], 
position=Position{position={input-topic={0=1, 

1=SucceededQueryResult{

result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc,

executionInfo=[
  Handled in class 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore
 via WrappedStateStore in 116767ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, 
  Handled in class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with 
serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns
],
position=Position{position={input-topic={1=1}, 

globalResult=null}
    Expected: is <[1, 2, 3]>
         but: was <[1, 2]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807)
        at 
org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
 {code}

> Flaky test IQv2StoreIntegrationTest
> ---
>
> Key: KAFKA-13714
> URL: https://issues.apache.org/jira/browse/KAFKA-13714
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: John Roesler
>Priority: Blocker
>
> I have observed multiple consistency violations in the 
> IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's 
> apparently a major flaw in the feature, we should not release with this bug 
> outstanding. Depending on the time-table, we may want to block the release or 
> pull the feature until the next release.
>  
> The first observation I have is from 23 Feb 2022. So far all observations 
> point to the range query in particular, and all observations have been for 
> RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the 
> windowed store built on RocksDB segments.
> For reference, range queries were implemented on 16 Feb 2022: 
> [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884]
> The window-specific range query test has also failed once that I have seen. 
> That feature was implemented on 2 Jan 2022: 
> [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c]
>  
> Here are some stack traces I have seen:
> {code:java}
> verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 2]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776)
>  {code}
> {code:java}
> verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI]
> java.lang.AssertionError: 
> Expected: is <[1, 2, 3]>
>  but: was <[1, 3]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQu

[GitHub] [kafka] jsancio commented on pull request #11733: KAFKA-13587; Implement leader recovery for KIP-704

2022-03-17 Thread GitBox


jsancio commented on pull request #11733:
URL: https://github.com/apache/kafka/pull/11733#issuecomment-1071229122


   > The PR looks good overall. I think there is one problem with the fetch 
validation. We are expecting that followers will detect the RECOVERED state 
through a `LeaderAndIsr` request from the controller. However, 
leaders/followers only accept`LeaderAndIsr` requests if there is an epoch bump, 
and that does not happen for `AlterPartition` requests. For KRaft, I think it 
is not a problem.
   
   You are correct. I removed the FETCH request validation looking at the 
leader recovery state and file this issue: 
https://issues.apache.org/jira/browse/KAFKA-13754
   
   This is okay because at the moment the topic partition leader immediately 
marks the partition as recovered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13754) Follower should reject Fetch request while the leader is recovering

2022-03-17 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13754:
--

 Summary: Follower should reject Fetch request while the leader is 
recovering
 Key: KAFKA-13754
 URL: https://issues.apache.org/jira/browse/KAFKA-13754
 Project: Kafka
  Issue Type: Task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


In the PR for KIP-704 we removed leader recovery state validation from the 
FETCH. This is okay because the leader immediately recovers the partition.

We should enable this validation before implementing log recovery from unclean 
leader election.

The old implementation and test is in this commit: 
https://github.com/apache/kafka/pull/11733/commits/c7e54b8f6cef087deac119d61a46d3586ead72b9



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] C0urante commented on a change in pull request #11903: KAFKA-13497: Add trace logging to RegexRouter

2022-03-17 Thread GitBox


C0urante commented on a change in pull request #11903:
URL: https://github.com/apache/kafka/pull/11903#discussion_r829392255



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
##
@@ -57,7 +61,10 @@ public R apply(R record) {
 final Matcher matcher = regex.matcher(record.topic());
 if (matcher.matches()) {
 final String topic = matcher.replaceFirst(replacement);
+log.trace("Rerouting from topic '{}' to new topic '{}'", 
record.topic(), topic);

Review comment:
   I don't see a reason to.
   
   If trace-level logging is disabled, the cost of this line will be two method 
invocations (one for `Record::topic`, which has a [trivial 
implementation](https://github.com/apache/kafka/blob/7afdb069bf5539ec404d9305239849ac35ad2d82/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L69-L71),
 and one for `Logger::trace`), plus a check to see if trace-level logging is 
enabled. No message formatting will take place.
   
   If we wrap this with `Logger::isTraceEnabled`, we'll reduce that to a single 
method invocation and check to see if trace-level logging is enabled. But if 
trace-level logging is enabled, we'd end up doubling the checks to see if the 
logger is enabled, and add an extra method invocation. There's a good writeup 
on the performance implications of this style of checking 
[here](https://logging.apache.org/log4j/1.2/manual.html#performance).
   
   It seems unlikely that any of this will have a significant impact on 
performance, especially considering that this same code path already contains a 
regex check a few lines above. There's also the existing logging in `Cast` 
that's mentioned in the description that takes place at trace level and doesn't 
have a guard for `Logger::isTraceEnabled`.
   
   Ultimately, if we're concerned about performance here, it'd probably be much 
more effective to cache the mappings of input topic -> output topic than to add 
guards around trace-level log statements that don't do involve any string 
concatenation or expensive `toString` calls.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11893: KAFKA-13682; KRaft Controller auto preferred leader election

2022-03-17 Thread GitBox


junrao commented on a change in pull request #11893:
URL: https://github.com/apache/kafka/pull/11893#discussion_r829355001



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -104,53 +117,73 @@ public PartitionChangeBuilder 
setTargetAdding(List targetAdding) {
 return this;
 }
 
-boolean shouldTryElection() {
-// If the new isr doesn't have the current leader, we need to try to 
elect a new
-// one. Note: this also handles the case where the current leader is 
NO_LEADER,
-// since that value cannot appear in targetIsr.
-if (!targetIsr.contains(partition.leader)) return true;
-
-// Check if we want to try to get away from a non-preferred leader.
-if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+// VisibleForTesting
+static class ElectionResult {
+final int node;
+final boolean unclean;
 
-return false;
+private ElectionResult(int node, boolean unclean) {
+this.node = node;
+this.unclean = unclean;
+}
 }
 
-class BestLeader {
-final int node;
-final boolean unclean;
+// VisibleForTesting
+/**
+ * Perform leader election based on the partition state and leader 
election type.
+ *
+ * See documentation for the Election type to see more details on the 
election types supported.
+ */
+ElectionResult electLeader() {
+// 1. Check if the election is not PREFERRED and we already have a 
valid leader
+if (election != Election.PREFERRED && 
targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {
+// Don't consider a new leader since the current leader meets all 
the constraints
+return new ElectionResult(partition.leader, false);
+}
 
-BestLeader() {
-for (int replica : targetReplicas) {
-if (targetIsr.contains(replica) && 
isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = false;
-return;
-}
-}
-if (uncleanElectionOk.get()) {
-for (int replica : targetReplicas) {
-if (isAcceptableLeader.apply(replica)) {
-this.node = replica;
-this.unclean = true;
-return;
-}
-}
+// 2. Attempt preferred replica election
+int preferredReplica = targetReplicas.get(0);
+if (targetIsr.contains(preferredReplica) && 
isAcceptableLeader.apply(preferredReplica)) {
+return new ElectionResult(preferredReplica, false);
+}
+
+// 3. Preferred replica was not elected, only continue if the current 
leader is not a valid leader
+if (targetIsr.contains(partition.leader) && 
isAcceptableLeader.apply(partition.leader)) {

Review comment:
   Is this step necessary given step 1 and 2? The election != 
Election.PREFERRED case is covered in step 1 and the other case seems covered 
by step 2.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -1197,6 +1266,25 @@ private void resetState() {
  */
 private long newBytesSinceLastSnapshot = 0;
 
+/**
+ * How long to delay partition leader balancing operations.
+ */
+private final OptionalLong leaderImbalanceCheckIntervalNs;
+
+private enum ImbalanceSchedule {
+// The leader balancing operation has been scheduled
+SCHEDULED,
+// If the leader balancing operation should be schedued, schedule it 
with a delay
+DEFERRED,
+// If the leader balancing operation should be schedued, schdule it 
immediately

Review comment:
   typo schedued and schdule

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##
@@ -953,6 +972,56 @@ private void cancelMaybeFenceReplicas() {
 queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
 }
 
+private static final String MAYBE_BALANCE_PARTITION_LEADERS = 
"maybeBalancePartitionLeaders";
+
+private static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
+
+private void maybeScheduleNextBalancePartitionLeaders() {
+if (imbalancedScheduled != ImbalanceSchedule.SCHEDULED &&
+leaderImbalanceCheckIntervalNs.isPresent() &&
+replicationControl.arePartitionLeadersImbalanced()) {
+
+log.debug(
+"Scheduling write event for {} because scheduled ({}), 
checkIntervalNs ({}) and isImbalanced ({})",
+MAYBE_BALANCE_PARTITION_LEADERS,
+imbalancedScheduled,
+leaderImbalanceCheckIntervalNs,
+replicationControl.arePartitionLeadersImbalanced()
+);
+
+

[jira] [Created] (KAFKA-13753) Log cleaner should transaction metadata in index until corresponding marker is removed

2022-03-17 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13753:
---

 Summary: Log cleaner should transaction metadata in index until 
corresponding marker is removed
 Key: KAFKA-13753
 URL: https://issues.apache.org/jira/browse/KAFKA-13753
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Currently the log cleaner will remove aborted transactions from the index as 
soon as it detects that the data from the transaction is gone. It does not wait 
until the corresponding marker has also been removed. Although it is extremely 
unlikely, it seems possible today that a Fetch might fail to return the aborted 
transaction metadata correctly if a log cleaning occurs concurrently. This is 
because the collection of aborted transactions is only done after the reading 
data from the log. It would be safer to preserve the aborted transaction 
metadata in the index until the marker is also removed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11903: KAFKA-13497: Add trace logging to RegexRouter

2022-03-17 Thread GitBox


dajac commented on a change in pull request #11903:
URL: https://github.com/apache/kafka/pull/11903#discussion_r82930



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java
##
@@ -57,7 +61,10 @@ public R apply(R record) {
 final Matcher matcher = regex.matcher(record.topic());
 if (matcher.matches()) {
 final String topic = matcher.replaceFirst(replacement);
+log.trace("Rerouting from topic '{}' to new topic '{}'", 
record.topic(), topic);

Review comment:
   Should we gate it with `log.isTraceEnabled()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11904: MINOR: Fix `ConsumerConfig.ISOLATION_LEVEL_DOC`

2022-03-17 Thread GitBox


dajac commented on pull request #11904:
URL: https://github.com/apache/kafka/pull/11904#issuecomment-1071105004


   @guizmaii Good catch! Why did you target 3.1 branch? We should fix it in 
trunk and I can back port it to older branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13750) Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13750.
-
Fix Version/s: 3.2.0
   3.1.1
   3.0.2
   Resolution: Fixed

> Client Compatability KafkaTest uses invalid idempotency configs
> ---
>
> Key: KAFKA-13750
> URL: https://issues.apache.org/jira/browse/KAFKA-13750
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.2.0, 3.1.1, 3.0.2
>
>
> With the switch to idempotency as a default, some of our tests broke 
> including 
> ClientCompatibilityFeaturesTest.run_compatibility_test for versions prior to 
> 0.11 where EOS was enabled. We need to configure the producer correctly for 
> these earlier versions.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac merged pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread GitBox


dajac merged pull request #11909:
URL: https://github.com/apache/kafka/pull/11909


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread GitBox


dajac commented on pull request #11909:
URL: https://github.com/apache/kafka/pull/11909#issuecomment-1071090738


   We don't need to wait for the build as this is a python change. Merging to 
trunk, 3.1 and 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11906: MINOR: Doc updates for Kafka 3.0.1

2022-03-17 Thread GitBox


dajac commented on pull request #11906:
URL: https://github.com/apache/kafka/pull/11906#issuecomment-1071088775


   @mimaison Thanks. Do we need to update the doc in the 3.0 branch as well? 
Otherwise, we will miss the update that you did if we would ever release 3.0.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13509) Support max timestamp in GetOffsetShell

2022-03-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13509.
-
  Reviewer: David Jacot
Resolution: Fixed

> Support max timestamp in GetOffsetShell
> ---
>
> Key: KAFKA-13509
> URL: https://issues.apache.org/jira/browse/KAFKA-13509
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
> Fix For: 3.2.0
>
>
> We would list offset with max timestamp using `kafka.tools.GetOffsetShell` :
> ```
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server 
> localhost:9092 --topic topic1 --time -3
> ```
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac merged pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-17 Thread GitBox


dajac merged pull request #11173:
URL: https://github.com/apache/kafka/pull/11173


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #11914: MINOR: Correct Connect docs on connector/task states

2022-03-17 Thread GitBox


C0urante opened a new pull request #11914:
URL: https://github.com/apache/kafka/pull/11914


   The `DESTROYED` state is represented internally as a tombstone record when 
running in distributed mode ([1]) and by the removal of the connector/task from 
the in-memory status map when running in standalone mode ([2], [3]). As a 
result, it will never appear to users of the REST API, and we should remove 
mention of it from our docs so that developers creating tooling against the 
REST API don't write unnecessary logic to account for that state.
   
   [1] - 
https://github.com/apache/kafka/blob/3dacdc5694da5db283524889d2270695defebbaa/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java#L318
   
   [2] - 
https://github.com/apache/kafka/blob/3dacdc5694da5db283524889d2270695defebbaa/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java#L64-L65
   
   [3] - 
https://github.com/apache/kafka/blob/3dacdc5694da5db283524889d2270695defebbaa/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java#L77-L78
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread GitBox


dajac commented on pull request #11909:
URL: https://github.com/apache/kafka/pull/11909#issuecomment-1071077188


   @jolshan Thanks. Do we need to backport it to older branches? I guess that 
we need it in 3.1 and 3.0, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-17 Thread GitBox


dajac commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-1071074769


   The build looks good 
[here](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11173/18/pipeline).
 I am not sure why github still shows it in progress. It might be due to the 
outage that they had today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13404) Kafka sink connectors do not commit offset correctly if messages are produced in transaction

2022-03-17 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508282#comment-17508282
 ] 

Chris Egerton commented on KAFKA-13404:
---

[~yujhe.li] can you clarify the impact of this behavior? Initially I was 
wondering if this might lead to duplicate record delivery by resuming from an 
earlier offset than where the consumer last read from, but if the only record 
between that offset and where the consumer read from is a control record, it 
won't be delivered to the sink task anyways.

The only other practical consequence I can think of is that the consumer lag 
for the connector may be off by one (see KAFKA-6607 for a similar issue in 
Kafka Streams). Is that the problem here, or is there something else?

> Kafka sink connectors do not commit offset correctly if messages are produced 
> in transaction
> 
>
> Key: KAFKA-13404
> URL: https://issues.apache.org/jira/browse/KAFKA-13404
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.1
>Reporter: Yu-Jhe Li
>Priority: Major
> Attachments: Main.scala
>
>
> The Kafka sink connectors don't commit offset to the latest log-end offset if 
> the messages are produced in a transaction.
> From the code of [WorkerSinkTask.java|#L467], we found that the sink 
> connector gets offset from messages and commits it to Kafka after the 
> messages are processed successfully. But for messages produced in the 
> transaction, there are additional record [control 
> batches|http://kafka.apache.org/documentation/#controlbatch] that are used to 
> indicate the transaction is successful or aborted.
>  
> You can reproduce it by running `connect-file-sink` with the following 
> properties:
> {noformat}
> /opt/kafka/bin/connect-standalone.sh /connect-standalone.properties 
> /connect-file-sink.properties{noformat}
> {code:java}
> # connect-standalone.properties
> bootstrap.servers=localhost:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> value.converter=org.apache.kafka.connect.storage.StringConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> # for testing
> offset.flush.interval.ms=1
> consumer.isolation.level=read_committed
> consumer.auto.offset.reset=none
> {code}
> {code:java}
> # connect-file-sink.properties
> name=local-file-sink
> connector.class=FileStreamSink
> tasks.max=1
> file=/tmp/test.sink.txt
> topics=test{code}
> And use the attached Java producer ([^Main.scala] to produce 10 messages to 
> the `test` topic in a transaction.
> You can see that the topic log-end offset is 11 now and the last record in 
> the segment file is control batches. But the consumer group offset is still 
> in 10. (If the record is deleted by topic retention, you will get 
> OffsetOutOfRange exception after restart the connector)
> {code:java}
> bash-5.1# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> kafka1:9092 --group connect-local-file-sink --describe 
> GROUP   TOPIC   PARTITION  CURRENT-OFFSET  
> LOG-END-OFFSET  LAG CONSUMER-ID   
> HOSTCLIENT-ID 
>
> connect-local-file-sink test0  10  11 
>  1   
> connector-consumer-local-file-sink-0-10777adb-72c2-4fd3-8773-4f5a0498903d 
> /172.21.0.3 connector-consumer-local-file-sink-0
> bash-5.1# /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --files /kafka/test-0/.log --print-data-log
> Dumping /kafka/test-0/.log
> Starting offset: 0
> baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 
> producerId: 4000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: 
> true isControl: false position: 0 CreateTime: 1634805907230 size: 208 magic: 
> 2 compresscodec: GZIP crc: 2170304005 isvalid: tru GZIP crc: 2170304005 
> isvalid: true
> | offset: 0 CreateTime: 1634805907210 keysize: -1 valuesize: 39 sequence: 0 
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 1 CreateTime: 1634805907230 keysize: -1 valuesize: 39 sequence: 1 
> headerKeys: [] payload: {"value": "banana", "time": 1634805907}
> | offset: 2 CreateTime: 1634805907230 keysize: -1 valuesize: 36 sequence: 2 
> headerKeys: [] payload: {"value": "ice", "time": 1634805907}
> | offset: 3 CreateTime: 1634805907230 keysize: -1 valuesize: 38 sequence: 3 
> headerKeys: [] payload: {"value": "apple", "time": 1634805907}
> | offset: 4 CreateTime: 1634805907230 keysize: -1 valuesize: 37 sequence: 4 
> headerKeys: [] payload: {"value": "home", "time": 1634805907}
> | offset: 5 Cre

[GitHub] [kafka] mimaison merged pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-17 Thread GitBox


mimaison merged pull request #11471:
URL: https://github.com/apache/kafka/pull/11471


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume

2022-03-17 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13136.
---
Resolution: Fixed

> kafka-connect task.max : active task in consumer group is limited by the 
> bigger topic to consume
> 
>
> Key: KAFKA-13136
> URL: https://issues.apache.org/jira/browse/KAFKA-13136
> Project: Kafka
>  Issue Type: Bug
>Reporter: raphaelauv
>Priority: Major
>
> In kafka-connect 2.7
> *The maximum number of active task for a sink connector is equal to the topic 
> with the biggest number of partitions to consume*
> An active task is a task with partitions attributed in the consumer-group of 
> the sink connector
> example :
> With 2 topics where each have 10 partitions ( 20 partitions in total )
> The maximum number of active task is 10 ( if I set task.max at 12 ,there is 
> 10 members of the consumer group consuming partitions and  2 members in the 
> consumer-group that do not have partitions to consume).
> If I add a third topic with 15 partitions to the connector conf then the 12 
> members of the consumer group are consuming partitions, and then if I set now 
> task.max at 17 only 15 members are active in the consumer-group.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Xiaobing Fang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508270#comment-17508270
 ] 

Xiaobing Fang commented on KAFKA-13752:
---

[~jolshan] I have done it in PR, PTAL.

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] fxbing commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-17 Thread GitBox


fxbing commented on pull request #11912:
URL: https://github.com/apache/kafka/pull/11912#issuecomment-1071008981


   > 
   
   done, PTAL. @dengziming 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508255#comment-17508255
 ] 

Justine Olshan edited comment on KAFKA-13752 at 3/17/22, 3:49 PM:
--

Ah, so I think the line that is causing an issue is:
{code:java}
if (topic.topicId() != Uuid.ZERO_UUID && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support non-zero topic IDs.");{code}
We should change this to !equals.


was (Author: jolshan):
Ah, so I think the line that is causing an issue is:
```

if (topic.topicId() != Uuid.ZERO_UUID && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support non-zero topic IDs.");
```

We should change this to !equals.

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-03-17 Thread Ron Craig (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508256#comment-17508256
 ] 

Ron Craig commented on KAFKA-9366:
--

Perfect!  Thanks, [~dongjin] .

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508255#comment-17508255
 ] 

Justine Olshan commented on KAFKA-13752:


Ah, so I think the line that is causing an issue is:
```

if (topic.topicId() != Uuid.ZERO_UUID && version < 12)
throw new UnsupportedVersionException("MetadataRequest version " + version +
" does not support non-zero topic IDs.");
```

We should change this to !equals.

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jolshan commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread GitBox


jolshan commented on pull request #11909:
URL: https://github.com/apache/kafka/pull/11909#issuecomment-1070978591


   Here is a link to a successful run:
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-03-17--001.system-test-kafka-branch-builder--1647476641--jolshan--KAFKA-13750--4447ebdb4c/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2022-03-17 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508199#comment-17508199
 ] 

Dongjin Lee commented on KAFKA-9366:


[~roncraig] Sorry for being late. the PR is now updated with log4j2 2.17.2.

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dengziming commented on pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-17 Thread GitBox


dengziming commented on pull request #11912:
URL: https://github.com/apache/kafka/pull/11912#issuecomment-1070947835


   Thank you for this PR, can you also add a test case in 
`MetadataRequestTest.testTopicIdAndNullTopicNameRequests`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11913: MINOR: Remove scala KafkaException

2022-03-17 Thread GitBox


dengziming commented on pull request #11913:
URL: https://github.com/apache/kafka/pull/11913#issuecomment-1070870414


   We can safely remove this class since we have removed 
kafka.security.auth.Authorizer in #10450 , ping @ijuma to have a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11913: MINOR: Remove scala KafkaException

2022-03-17 Thread GitBox


dengziming opened a new pull request #11913:
URL: https://github.com/apache/kafka/pull/11913


   *More detailed description of your change*
   We use org.apache.kafka.common.KafkaException instead of 
kafka.common.KafkaException
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9542) ZSTD Compression Not Working

2022-03-17 Thread Olumide Ajiboye (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508163#comment-17508163
 ] 

Olumide Ajiboye commented on KAFKA-9542:


Hi,

 

I just upgraded to `quay.io/strimzi/kafka:0.28.0-kafka-3.1.0` from 
`quay.io/strimzi/kafka:0.27.1-kafka-3.0.0` and I am observing this same issue.

Other compression formats are okay except zstd.

 

Best regards

> ZSTD Compression Not Working
> 
>
> Key: KAFKA-9542
> URL: https://issues.apache.org/jira/browse/KAFKA-9542
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 2.3.0
> Environment: Linux, CentOS
>Reporter: Prashant
>Priority: Critical
>
> I enabled zstd compression at producer by adding  "compression.type=zstd" in 
> producer config. When try to run it, producer fails with 
> "org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request"
> In Broker Logs, I could find following exception:
>  
> [2020-02-12 11:48:04,623] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition load_logPlPts-6 (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could 
> not initialize class 
> org.apache.kafka.common.record.CompressionType$ZstdConstructors
>        at 
> org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:133)
>        at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
>        at 
> org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:324)
>        at 
> scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
>        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>        at 
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:269)
>        at 
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:261)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>        at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261)
>        at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:72)
>        at kafka.log.Log$$anonfun$append$2.liftedTree1$1(Log.scala:869)
>        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868)
>        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:850)
>        at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
>        at kafka.log.Log.append(Log.scala:850)
>        at kafka.log.Log.appendAsLeader(Log.scala:819)
>        at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:771)
>        at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:759)
>        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 
>  
> This is fresh broker installed on "CentOS Linux" v7. This doesn't seem to be 
> a classpath issue as same package is working on MacOS. 
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dengziming commented on pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-17 Thread GitBox


dengziming commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-1070856842


   Thank you @dajac for your efforts, I reworded the error message according to 
your suggestions, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-17 Thread GitBox


dengziming commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r829057799



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -103,59 +104,77 @@ object GetOffsetShell {
   throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
 }
 
-val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
 val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
 } else {
-  val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
   createTopicPartitionFilterWithTopicAndPartitionPattern(
 if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-excludeInternalTopics,
-partitionIdsRequested
+options.valueOf(partitionsOpt)
   )
 }
 
 val config = if (options.has(commandConfigOpt))
   Utils.loadProps(options.valueOf(commandConfigOpt))
 else
   new Properties
-config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
+val adminClient = Admin.create(config)
 
 try {
-  val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
+  val partitionInfos = listPartitionInfos(adminClient, 
topicPartitionFilter, excludeInternalTopics)
 
   if (partitionInfos.isEmpty) {
 throw new IllegalArgumentException("Could not match any 
topic-partitions with the specified filters")
   }
 
-  val topicPartitions = partitionInfos.flatMap { p =>
-if (p.leader == null) {
-  System.err.println(s"Error: topic-partition 
${p.topic}:${p.partition} does not have a leader. Skip getting offsets")
-  None
-} else
-  Some(new TopicPartition(p.topic, p.partition))
-  }
+  val timestampsToSearch = partitionInfos.map(tp => tp -> 
offsetSpec).toMap.asJava
 
-  /* Note that the value of the map can be null */
-  val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-case _ =>
-  val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-  consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-  }
+  val listOffsetsResult = adminClient.listOffsets(timestampsToSearch)
+  val partitionOffsets = partitionInfos.flatMap { tp =>
+try {
+  val partitionInfo = listOffsetsResult.partitionResult(tp).get
+  Some((tp, partitionInfo.offset))
+} catch {
+  case e: ExecutionException =>
+e.getCause match {
+  case _: LeaderNotAvailableException =>
+System.err.println(s"Skip getting offsets for: topic-partition 
${tp.topic}:${tp.partition} since it does not have a leader right now.")
+  case _ =>
+System.err.println(s"Error while getting offset for 
topic-partition ${tp.topic}:${tp.partition}")
+e.printStackTrace()

Review comment:
   Yeah, for AdminClient LeaderNotAvailableException is the same as other 
KafkaException so the 2 branches can be merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Xiaobing Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaobing Fang updated KAFKA-13752:
--
Description: 
{code:java}
Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
is true in scala, but in java is false.

 

So this test run sccessfully. Is this the expected situation??
{code:java}
@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one 
separately and ensure the error is thrown.
List topics = Arrays.asList(
new 
MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic -> {
MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic));
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData);
assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));
})
);
}{code}

  was:
{code:java}
Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
is true in scala, but in java is false.

 

So this test run sccessfully. Is this the expected situation??
{code:java}
@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one 
separately and ensure the error is thrown.
List topics = Arrays.asList(
new 
MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic -> {
MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic));
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData);
assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));
})
);
}{code}


> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] fxbing opened a new pull request #11912: KAFKA-13752: Uuid compare using equals in java

2022-03-17 Thread GitBox


fxbing opened a new pull request #11912:
URL: https://github.com/apache/kafka/pull/11912


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508157#comment-17508157
 ] 

Ismael Juma commented on KAFKA-13752:
-

cc [~jolshan] 

> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Xiaobing Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaobing Fang updated KAFKA-13752:
--
Description: 
{code:java}
Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
is true in scala, but in java is false.

 

So this test run sccessfully. Is this the expected situation??
{code:java}
@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one 
separately and ensure the error is thrown.
List topics = Arrays.asList(
new 
MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic -> {
MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic));
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData);
assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));
})
);
}{code}

  was:
`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.

 

So this test run sccessfully. Is this the expected situation??

```

@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one separately 
and ensure the error is thrown.
List topics = Arrays.asList(
new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic ->

{ MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic)); 
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData); 
assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }

)
);
}

```


> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> {code:java}
> Uuid.ZERO_UUID == new Uuid(0L, 0L){code}
> is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> {code:java}
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one 
> separately and ensure the error is thrown.
> List topics = Arrays.asList(
> new 
> MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an 
> error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version));
> })
> );
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Xiaobing Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaobing Fang updated KAFKA-13752:
--
Description: 
`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.

 

So this test run sccessfully. Is this the expected situation??

```

@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one separately 
and ensure the error is thrown.
List topics = Arrays.asList(
new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic ->

{ MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic)); 
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData); 
assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); }

)
);
}

```

  was:
`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.

 

So this test run sccessfully. Is it excepted ?

```

@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one separately 
and ensure the error is thrown.
List topics = Arrays.asList(
new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic -> {
MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic));
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData);
assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
})
);
}


> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.
>  
> So this test run sccessfully. Is this the expected situation??
> ```
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one separately 
> and ensure the error is thrown.
> List topics = Arrays.asList(
> new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic ->
> { MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic)); 
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData); 
> assertThrows(UnsupportedVersionException.class, () -> 
> builder.build(version)); }
> )
> );
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Xiaobing Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiaobing Fang updated KAFKA-13752:
--
Description: 
`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.

 

So this test run sccessfully. Is it excepted ?

```

@Test
public void testTopicIdAndNullTopicNameRequests() {
// Construct invalid MetadataRequestTopics. We will build each one separately 
and ensure the error is thrown.
List topics = Arrays.asList(
new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
Uuid(0L, 0L)));

// if version is 10 or 11, the invalid topic metadata should return an error
List invalidVersions = Arrays.asList((short) 10, (short) 11);
invalidVersions.forEach(version ->
topics.forEach(topic -> {
MetadataRequestData metadataRequestData = new 
MetadataRequestData().setTopics(Collections.singletonList(topic));
MetadataRequest.Builder builder = new 
MetadataRequest.Builder(metadataRequestData);
assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
})
);
}

  was:`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is 
false.


> Using `equals` instead of `==` when Uuid compare in Java
> 
>
> Key: KAFKA-13752
> URL: https://issues.apache.org/jira/browse/KAFKA-13752
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Xiaobing Fang
>Priority: Minor
>
> `Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.
>  
> So this test run sccessfully. Is it excepted ?
> ```
> @Test
> public void testTopicIdAndNullTopicNameRequests() {
> // Construct invalid MetadataRequestTopics. We will build each one separately 
> and ensure the error is thrown.
> List topics = Arrays.asList(
> new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(new 
> Uuid(0L, 0L)));
> // if version is 10 or 11, the invalid topic metadata should return an error
> List invalidVersions = Arrays.asList((short) 10, (short) 11);
> invalidVersions.forEach(version ->
> topics.forEach(topic -> {
> MetadataRequestData metadataRequestData = new 
> MetadataRequestData().setTopics(Collections.singletonList(topic));
> MetadataRequest.Builder builder = new 
> MetadataRequest.Builder(metadataRequestData);
> assertThrows(UnsupportedVersionException.class, () -> builder.build(version));
> })
> );
> }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dengziming commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-17 Thread GitBox


dengziming commented on a change in pull request #11471:
URL: https://github.com/apache/kafka/pull/11471#discussion_r829040670



##
File path: 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
##
@@ -73,7 +75,6 @@ public void teardown() {
 }
 
 private void replay() {

Review comment:
   Yeah, we have been stuck into the old code, this is the simplest way. 😂, 
Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13752) Using `equals` instead of `==` when Uuid compare in Java

2022-03-17 Thread Xiaobing Fang (Jira)
Xiaobing Fang created KAFKA-13752:
-

 Summary: Using `equals` instead of `==` when Uuid compare in Java
 Key: KAFKA-13752
 URL: https://issues.apache.org/jira/browse/KAFKA-13752
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Xiaobing Fang


`Uuid.ZERO_UUID == new Uuid(0L, 0L)` is true in scala, but in java is false.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on a change in pull request #11471: MINOR: Replace EasyMock with Mockito in connect:file

2022-03-17 Thread GitBox


mimaison commented on a change in pull request #11471:
URL: https://github.com/apache/kafka/pull/11471#discussion_r829001380



##
File path: 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
##
@@ -73,7 +75,6 @@ public void teardown() {
 }
 
 private void replay() {

Review comment:
   I wonder if it would be simpler to get rid of the `replay()` method and 
just call `verifyAll()` in the tests that need it. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11909: KAFKA-13750: Client Compatability KafkaTest uses invalid idempotency configs

2022-03-17 Thread GitBox


dajac commented on pull request #11909:
URL: https://github.com/apache/kafka/pull/11909#issuecomment-1070707701


   @jolshan Would you have a link to a successful run of the tests with your 
change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11173: KAFKA-13509: Support max timestamp in GetOffsetShell

2022-03-17 Thread GitBox


dajac commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r828928713



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -103,59 +104,77 @@ object GetOffsetShell {
   throw new IllegalArgumentException("--topic-partitions cannot be used 
with --topic or --partitions")
 }
 
-val listOffsetsTimestamp = options.valueOf(timeOpt).longValue
+val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt))
 
 val topicPartitionFilter = if (options.has(topicPartitionsOpt)) {
-  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt), 
excludeInternalTopics)
+  
createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt))
 } else {
-  val partitionIdsRequested = 
createPartitionSet(options.valueOf(partitionsOpt))
-
   createTopicPartitionFilterWithTopicAndPartitionPattern(
 if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None,
-excludeInternalTopics,
-partitionIdsRequested
+options.valueOf(partitionsOpt)
   )
 }
 
 val config = if (options.has(commandConfigOpt))
   Utils.loadProps(options.valueOf(commandConfigOpt))
 else
   new Properties
-config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
-val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
+config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
+val adminClient = Admin.create(config)
 
 try {
-  val partitionInfos = listPartitionInfos(consumer, topicPartitionFilter)
+  val partitionInfos = listPartitionInfos(adminClient, 
topicPartitionFilter, excludeInternalTopics)
 
   if (partitionInfos.isEmpty) {
 throw new IllegalArgumentException("Could not match any 
topic-partitions with the specified filters")
   }
 
-  val topicPartitions = partitionInfos.flatMap { p =>
-if (p.leader == null) {
-  System.err.println(s"Error: topic-partition 
${p.topic}:${p.partition} does not have a leader. Skip getting offsets")
-  None
-} else
-  Some(new TopicPartition(p.topic, p.partition))
-  }
+  val timestampsToSearch = partitionInfos.map(tp => tp -> 
offsetSpec).toMap.asJava
 
-  /* Note that the value of the map can be null */
-  val partitionOffsets: collection.Map[TopicPartition, java.lang.Long] = 
listOffsetsTimestamp match {
-case ListOffsetsRequest.EARLIEST_TIMESTAMP => 
consumer.beginningOffsets(topicPartitions.asJava).asScala
-case ListOffsetsRequest.LATEST_TIMESTAMP => 
consumer.endOffsets(topicPartitions.asJava).asScala
-case _ =>
-  val timestampsToSearch = topicPartitions.map(tp => tp -> 
(listOffsetsTimestamp: java.lang.Long)).toMap.asJava
-  consumer.offsetsForTimes(timestampsToSearch).asScala.map { case (k, 
x) =>
-if (x == null) (k, null) else (k, x.offset: java.lang.Long)
-  }
+  val listOffsetsResult = adminClient.listOffsets(timestampsToSearch)
+  val partitionOffsets = partitionInfos.flatMap { tp =>
+try {
+  val partitionInfo = listOffsetsResult.partitionResult(tp).get
+  Some((tp, partitionInfo.offset))
+} catch {
+  case e: ExecutionException =>
+e.getCause match {
+  case _: LeaderNotAvailableException =>
+System.err.println(s"Skip getting offsets for: topic-partition 
${tp.topic}:${tp.partition} since it does not have a leader right now.")
+  case _ =>
+System.err.println(s"Error while getting offset for 
topic-partition ${tp.topic}:${tp.partition}")
+e.printStackTrace()

Review comment:
   We should not print the stack trace like this with 
`e.printStackTrace()`. It would be better to include the error name or the 
error message in the `System.err.println`.
   
   How about having a generic error for all `KafkaException`? Something like 
`Error: Skip getting offsets for topic-partition ${p.topic}:${p.partition due 
to: ${e.getMessage}.
   
   If we would get any non `KafkaException` in the cause, we should throw if 
further, in my opinion. I have not sure if it can really happen but let's be on 
the safe side.

##
File path: core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
##
@@ -109,6 +111,61 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
 )
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("-1", "latest"))
+  def testGetLatestOffsets(time: String): Unit = {
+val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", 
"--time", time))
+assertEquals(
+  List(
+("topic1", 0, Some(1)),
+("topic2", 0, 

[jira] [Updated] (KAFKA-13751) On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms

2022-03-17 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13751:
-
Reviewer: Ismael Juma  (was: Luke Chen)

> On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms
> 
>
> Key: KAFKA-13751
> URL: https://issues.apache.org/jira/browse/KAFKA-13751
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 3.0.1
>Reporter: RivenSun
>Priority: Critical
>
> h1. Phenomenon:
>  SASL/OAUTHBEARER, whether implemented by default or customized by the user, 
> is not compatible with other SASL mechanisms.
> h3.  
> case1:
> kafka_server_jaas_oauth.conf
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="admin"
>   user_admin="admin"
>   user_alice="alice"; 
>org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
> required;
>org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin"
>   password="admin_scram";
> }; {code}
>  server.properties
> {code:java}
> advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669
>  
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER{code}
> Error when starting kafka:
> server.log
> {code:java}
> [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
> Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
>         at kafka.network.Processor.(SocketServer.scala:724)
>         at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
>         at 
> kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
>         at 
> kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
>         at 
> kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
>         at kafka.network.SocketServer.startup(SocketServer.scala:122)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
>         at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>         at kafka.Kafka$.main(Kafka.scala:82)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null 
> JAAS mechanism configuration (size was 3)
>         at 
> org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
>         at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
>         ... 17 more
> [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down 
> (kafka.server.KafkaServer)
> [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket 
> server request processors (kafka.network.SocketServer) {code}
> The default implementation class of oauthbearer's 
> `sasl.server.callback.handler.class` is 
> OAuthBearerUnsecuredValidatorCallbackHandler. 
> In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, 
> the jaasConfigEntries parameter is verified.
> What I want to say is that {*}the verification logic here is completely 
> reasonable{*}, but the jaasConfigEntries passed in from the upper layer 
> should not contain the AppConfigurationEntry of other loginModules. There are 
> several other codes for the check of the same keyword *"Must supply exactly 1 
> non-null JAAS mechanism configuration".*
> Rootcause elaborates later.
> By the way, at present, KafkaServer allows {*}the same LoginModule to be 
> configured multiple times in kafkaJaasConfigFile{*}, which will also lead to 
> the phenomenon of case1.
> kafka_server_jaas_oauth.conf eg:
> {code:java}
> KafkaServer {
>org.apache.kafka.common.security.oauthbearer.OAu

[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-17 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508083#comment-17508083
 ] 

RivenSun commented on KAFKA-13689:
--

Hi [~showuon] [~guozhang] 
After rethinking the discussion in PR-11800, luke, I think it might be simpler 
as you said at the beginning to make the log output a bit more informative, and 
we don't need a KIP for that.

AbstractConfig#logUnused()
{code:java}
public void logUnused() {
for (String key : unused())
log.warn("The configuration '{}' was supplied but is not used. It could 
because it is not a known config," +
" or some components are not enabled.", key);
} {code}
WDYT?
Thanks.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] RivenSun2 commented on pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols

2022-03-17 Thread GitBox


RivenSun2 commented on pull request #11911:
URL: https://github.com/apache/kafka/pull/11911#issuecomment-1070603378


   Hi @guozhangwang and @showuon
   please help to review the PR .
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 opened a new pull request #11911: KAFKA-13463: Make pause behavior consistent between cooperative and eager protocols

2022-03-17 Thread GitBox


RivenSun2 opened a new pull request #11911:
URL: https://github.com/apache/kafka/pull/11911


   Make pause behavior consistent between cooperative and eager protocols
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11910: KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode

2022-03-17 Thread GitBox


dengziming opened a new pull request #11910:
URL: https://github.com/apache/kafka/pull/11910


   *More detailed description of your change*
   In zk mode, the topic "foo_bar" will conflict with "foo.bar" because of 
limitations in metric names, we should implement this in KRaft mode.
   Add an itcase in TopicCommandIntegrationTest and change 
TopicCommandIntegrationTest to support KRaft mode.
   
   *Summary of testing strategy (including rationale)*
   Added a unit test, and also an integration test case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-13740) kafka-stream-client-shutdown

2022-03-17 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-13740.
-

> kafka-stream-client-shutdown
> 
>
> Key: KAFKA-13740
> URL: https://issues.apache.org/jira/browse/KAFKA-13740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have an apache kafka streams application . I notice that it sometimes 
> shutsdown when a rebalancing occurs with no real reason for the shutdown . It 
> doesn't even throw an exception.
> Here are some logs on the same 
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2022-03-08 17:13:37,024] ERROR stream-thread 
> [svc-stream-collector-StreamThread-1] A Kafka Streams client in this Kafka 
> Streams application is requesting to shutdown the application 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,030] INFO stream-client [svc-stream-collector] State 
> transition from REBALANCING to PENDING_ERROR 
> (org.apache.kafka.streams.KafkaStreams)
> old state:REBALANCING new state:PENDING_ERROR
> [2022-03-08 17:13:37,031] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] State transition from 
> PARTITIONS_REVOKED to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Thread state is already 
> PENDING_SHUTDOWN, skipping the run once call after poll request 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] WARN stream-thread 
> [svc-stream-collector-StreamThread-1] Detected that shutdown was requested. 
> All clients in this app will now begin to shutdown 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> I'm suspecting its because there are no `newly assigned partitions in the log 
> below`
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {code}
> However I'm not exactly sure why this error occurs . Any help would be 
> appreciated.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13740) kafka-stream-client-shutdown

2022-03-17 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508043#comment-17508043
 ] 

Bruno Cadonna commented on KAFKA-13740:
---

[~prashanthjbabu] Glad that you could sort it out!

> kafka-stream-client-shutdown
> 
>
> Key: KAFKA-13740
> URL: https://issues.apache.org/jira/browse/KAFKA-13740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have an apache kafka streams application . I notice that it sometimes 
> shutsdown when a rebalancing occurs with no real reason for the shutdown . It 
> doesn't even throw an exception.
> Here are some logs on the same 
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2022-03-08 17:13:37,024] ERROR stream-thread 
> [svc-stream-collector-StreamThread-1] A Kafka Streams client in this Kafka 
> Streams application is requesting to shutdown the application 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,030] INFO stream-client [svc-stream-collector] State 
> transition from REBALANCING to PENDING_ERROR 
> (org.apache.kafka.streams.KafkaStreams)
> old state:REBALANCING new state:PENDING_ERROR
> [2022-03-08 17:13:37,031] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,032] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] State transition from 
> PARTITIONS_REVOKED to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] INFO stream-thread 
> [svc-stream-collector-StreamThread-1] Thread state is already 
> PENDING_SHUTDOWN, skipping the run once call after poll request 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2022-03-08 17:13:37,067] WARN stream-thread 
> [svc-stream-collector-StreamThread-1] Detected that shutdown was requested. 
> All clients in this app will now begin to shutdown 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {code}
> I'm suspecting its because there are no `newly assigned partitions in the log 
> below`
> {code:java}
> [2022-03-08 17:13:37,024] INFO [Consumer 
> clientId=svc-stream-collector-StreamThread-1-consumer, 
> groupId=svc-stream-collector] Adding newly assigned partitions:  
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> {code}
> However I'm not exactly sure why this error occurs . Any help would be 
> appreciated.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume

2022-03-17 Thread raphaelauv (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508032#comment-17508032
 ] 

raphaelauv edited comment on KAFKA-13136 at 3/17/22, 8:01 AM:
--

thank you very much for that explanation , it's very clear :+1

yes we can close that ticket, thank you


was (Author: raphaelauv):
thank you very much for that explaination , it's very clear :+1

yes we can close that ticket, thank you

> kafka-connect task.max : active task in consumer group is limited by the 
> bigger topic to consume
> 
>
> Key: KAFKA-13136
> URL: https://issues.apache.org/jira/browse/KAFKA-13136
> Project: Kafka
>  Issue Type: Bug
>Reporter: raphaelauv
>Priority: Major
>
> In kafka-connect 2.7
> *The maximum number of active task for a sink connector is equal to the topic 
> with the biggest number of partitions to consume*
> An active task is a task with partitions attributed in the consumer-group of 
> the sink connector
> example :
> With 2 topics where each have 10 partitions ( 20 partitions in total )
> The maximum number of active task is 10 ( if I set task.max at 12 ,there is 
> 10 members of the consumer group consuming partitions and  2 members in the 
> consumer-group that do not have partitions to consume).
> If I add a third topic with 15 partitions to the connector conf then the 12 
> members of the consumer group are consuming partitions, and then if I set now 
> task.max at 17 only 15 members are active in the consumer-group.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13136) kafka-connect task.max : active task in consumer group is limited by the bigger topic to consume

2022-03-17 Thread raphaelauv (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508032#comment-17508032
 ] 

raphaelauv commented on KAFKA-13136:


thank you very much for that explaination , it's very clear :+1

yes we can close that ticket, thank you

> kafka-connect task.max : active task in consumer group is limited by the 
> bigger topic to consume
> 
>
> Key: KAFKA-13136
> URL: https://issues.apache.org/jira/browse/KAFKA-13136
> Project: Kafka
>  Issue Type: Bug
>Reporter: raphaelauv
>Priority: Major
>
> In kafka-connect 2.7
> *The maximum number of active task for a sink connector is equal to the topic 
> with the biggest number of partitions to consume*
> An active task is a task with partitions attributed in the consumer-group of 
> the sink connector
> example :
> With 2 topics where each have 10 partitions ( 20 partitions in total )
> The maximum number of active task is 10 ( if I set task.max at 12 ,there is 
> 10 members of the consumer group consuming partitions and  2 members in the 
> consumer-group that do not have partitions to consume).
> If I add a third topic with 15 partitions to the connector conf then the 12 
> members of the consumer group are consuming partitions, and then if I set now 
> task.max at 17 only 15 members are active in the consumer-group.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)