[jira] [Updated] (KAFKA-15444) KIP-974: Docker Image for GraalVM based Native Kafka Broker

2023-09-07 Thread Krishna Agarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishna Agarwal updated KAFKA-15444:

Description: [KIP-974: Docker Image for GraalVM based Native Kafka 
Broker|https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker]

> KIP-974: Docker Image for GraalVM based Native Kafka Broker
> ---
>
> Key: KAFKA-15444
> URL: https://issues.apache.org/jira/browse/KAFKA-15444
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Krishna Agarwal
>Assignee: Krishna Agarwal
>Priority: Major
>  Labels: KIP-974
>
> [KIP-974: Docker Image for GraalVM based Native Kafka 
> Broker|https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15445) KIP-975: Docker Image for Apache Kafka

2023-09-07 Thread Krishna Agarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishna Agarwal updated KAFKA-15445:

Description: [KIP-975: Docker Image for Apache 
Kafka|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka]
  (was: 
[KIP-975|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka])

> KIP-975: Docker Image for Apache Kafka
> --
>
> Key: KAFKA-15445
> URL: https://issues.apache.org/jira/browse/KAFKA-15445
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Krishna Agarwal
>Assignee: Krishna Agarwal
>Priority: Major
>  Labels: KIP-975
>
> [KIP-975: Docker Image for Apache 
> Kafka|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15445) KIP-975: Docker Image for Apache Kafka

2023-09-07 Thread Krishna Agarwal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishna Agarwal updated KAFKA-15445:

Description: 
[KIP-975|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka]

> KIP-975: Docker Image for Apache Kafka
> --
>
> Key: KAFKA-15445
> URL: https://issues.apache.org/jira/browse/KAFKA-15445
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Krishna Agarwal
>Assignee: Krishna Agarwal
>Priority: Major
>  Labels: KIP-975
>
> [KIP-975|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15445) KIP-975: Docker Image for Apache Kafka

2023-09-07 Thread Krishna Agarwal (Jira)
Krishna Agarwal created KAFKA-15445:
---

 Summary: KIP-975: Docker Image for Apache Kafka
 Key: KAFKA-15445
 URL: https://issues.apache.org/jira/browse/KAFKA-15445
 Project: Kafka
  Issue Type: New Feature
Reporter: Krishna Agarwal
Assignee: Krishna Agarwal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15444) KIP-974: Docker Image for GraalVM based Native Kafka Broker

2023-09-07 Thread Krishna Agarwal (Jira)
Krishna Agarwal created KAFKA-15444:
---

 Summary: KIP-974: Docker Image for GraalVM based Native Kafka 
Broker
 Key: KAFKA-15444
 URL: https://issues.apache.org/jira/browse/KAFKA-15444
 Project: Kafka
  Issue Type: New Feature
Reporter: Krishna Agarwal
Assignee: Krishna Agarwal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] github-actions[bot] commented on pull request #13836: KAFKA-14218: Replace temp file handler with JUnit 5 Temporary Directory Support

2023-09-07 Thread via GitHub


github-actions[bot] commented on PR #13836:
URL: https://github.com/apache/kafka/pull/13836#issuecomment-1711026816

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.

2023-09-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762931#comment-17762931
 ] 

Luke Chen commented on KAFKA-14912:
---

Agree to limit the cache by size, instead of the total entry number.

> Introduce a configuration for remote index cache size, preferably a dynamic 
> config.
> ---
>
> Key: KAFKA-14912
> URL: https://issues.apache.org/jira/browse/KAFKA-14912
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: hudeqi
>Priority: Major
>
> Context: We need to make the 1024 value here [1] as dynamically configurable
> [1] 
> https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #14287: [Minor] Check the existence of AppInfo for the given ID before creating a new mbean of the same name

2023-09-07 Thread via GitHub


showuon commented on code in PR #14287:
URL: https://github.com/apache/kafka/pull/14287#discussion_r1319285955


##
clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java:
##
@@ -60,8 +60,13 @@ public static String getCommitId() {
 public static synchronized void registerAppInfo(String prefix, String id, 
Metrics metrics, long nowMs) {
 try {
 ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + 
Sanitizer.jmxSanitize(id));
+MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+if (server.isRegistered(name)) {
+log.info("App info {} for {} already exists, so skipping  a 
new mbean creation", prefix, id);

Review Comment:
   nit: additional space after _skipping_.
   
   Question: What is this log output under consumer's case? I think it's:
   
   `App info kafka.consumer for transcript2_REALTIME-transcript-topic-0 already 
exists, so skipping a new mbean creation`
   
   Could we change to:
   `log.info("The mbean of App info: [{}], id: [{}] already exists, so skipping 
a new mbean creation.", prefix, id);`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #14224: MINOR: fix currentLag javadoc

2023-09-07 Thread via GitHub


mjsax commented on PR #14224:
URL: https://github.com/apache/kafka/pull/14224#issuecomment-1710988445

   Thanks for the fix!
   
   Merged to `trunk` and cherry-picked to `3.6` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #14224: MINOR: fix currentLag javadoc

2023-09-07 Thread via GitHub


mjsax merged PR #14224:
URL: https://github.com/apache/kafka/pull/14224


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #12988: KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams

2023-09-07 Thread via GitHub


mjsax commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-1710983828

   Yes, that's expected and we should also cover this case.
   
   Because there are multiple consumer, we use `consumer.` to allow users to 
change a config for _all_ consumer. If you want to change a config for a 
specific consumer (or if you want to configure two consumer differently), you 
would use `main.consumer.` et al. If both prefix (generic and specific) are use 
the consumer-specific prefix has preference over the general `consumer.` 
prefix. Does this make sense? It's basically a "config hierarchy" (flexible and 
powerful, but maybe a little hard to understand on first encounter...)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #14221: KAFKA-15338: The metric group documentation for metrics added in KAFK…

2023-09-07 Thread via GitHub


mjsax commented on PR #14221:
URL: https://github.com/apache/kafka/pull/14221#issuecomment-1710978564

   Thanks for the PR @atu-sharm. Merged to `trunk`, and cherry-picked to `3.6`, 
`3.5`, `3.4`, and `3.3` branches.
   
   Could you also do a PR against https://github.com/apache/kafka-site and pot 
this fix, so we get fix live on the web-page right away?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #14221: KAFKA-15338: The metric group documentation for metrics added in KAFK…

2023-09-07 Thread via GitHub


mjsax merged PR #14221:
URL: https://github.com/apache/kafka/pull/14221


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14317: KAFKA-13973: Fix inflated block cache metrics

2023-09-07 Thread via GitHub


mjsax commented on code in PR #14317:
URL: https://github.com/apache/kafka/pull/14317#discussion_r1319266372


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java:
##
@@ -373,14 +373,14 @@ private Gauge 
gaugeToComputeBlockCacheMetrics(final String propertyN
 // values of RocksDB properties are of type unsigned 
long in C++, i.e., in Java we need to use
 // BigInteger and construct the object from the byte 
representation of the value
 result = new BigInteger(1, longToBytes(
-
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + 
propertyName)
+
valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)
 ));
 break;
 } else {
 // values of RocksDB properties are of type unsigned 
long in C++, i.e., in Java we need to use
 // BigInteger and construct the object from the byte 
representation of the value
 result = result.add(new BigInteger(1, longToBytes(
-
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + 
propertyName)
+
valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName)

Review Comment:
   Btw: Maybe 
https://github.com/speedb-io/speedb/issues/583#issuecomment-1616710762 helps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-07 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15443:
---

 Summary: Upgrade RocksDB dependency
 Key: KAFKA-15443
 URL: https://issues.apache.org/jira/browse/KAFKA-15443
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams currently depends on RocksDB 7.9.2

However, the latest version of RocksDB is already 8.5.3. We should check the 
RocksDB release notes to see what benefits we get to upgrade to the latest 
version (and file corresponding tickets to exploit improvement of newer 
releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #14351: KAFKA-15441 Allow broker heartbeats to complete in metadata transaction

2023-09-07 Thread via GitHub


cmccabe commented on PR #14351:
URL: https://github.com/apache/kafka/pull/14351#issuecomment-1710954958

   Thinking about this more, If something runs in premigration it should 
complete in premigration. But maybe not complete inside a transaction more 
generally. Like I can imagine us doing something later where a heartbeat kicks 
off a big transaction containing a lot of records. We wouldn't want to complete 
inside that transaction.
   
   tl;dr We can probably just get rid of RUNS_IN_TRANSACTION and have it be 
implied by:
   * you have RUNS_IN_PREMIGRATION and
   * we're in premigration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-07 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762920#comment-17762920
 ] 

Luke Chen commented on KAFKA-12473:
---

Yes, we can wait for KIP-848. Thanks [~dajac].

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2023-09-07 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762921#comment-17762921
 ] 

Satish Duggana commented on KAFKA-9800:
---

[~junrao]  [~schofielaj] Are you planning to merge these changes to 3.6 branch?

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: KIP-580, client
> Fix For: 3.7.0
>
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept by TopicPartitionState. Thus, TopicPartitionState will have the 
> new property recording the number of attempts.
> Metadata:
>  #  Metadata lives as a singleton in many clients. Add a new property 
> recording the number of attempts
>  AdminClient:
>  # AdminClient has its own request abstraction Call. The failed attempts are 
> already kept by the abstraction. So probably clean the Call class logic a bit.
> Existing tests:
>  # If the tests are testing the retry backoff, add a delta to the assertion, 
> considering the existence of the jitter.
>  # If the tests are testing other functionality, we can specify the same 
> value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make 
> the retry backoff static. We can use this trick to make the existing tests 
> compatible with the changes.
> There're other common usages look like client.poll(timeout), where the 
> timeout passed in is the retry backoff value. We won't change these usages 
> since its underlying logic is nioSelector.select(timeout) and 
> nioSelector.selectNow(), which means if no interested op exists, the client 
> will block retry backoff milliseconds. This is an optimization when there's 
> no request that needs to be sent but the client is waiting for responses. 
> Specifically, if the client fails the inflight requests before the retry 
> backoff milliseconds passed, it still needs to wait until that amount of time 
> passed, unless there's a new request need to be sent.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-07 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-12473:
-

Assignee: (was: Luke Chen)

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-09-07 Thread via GitHub


satishd commented on PR #14111:
URL: https://github.com/apache/kafka/pull/14111#issuecomment-1710948188

   @junrao @AndrewJSchofield Are you planning to merge these changes to 3.6 
branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #14157: KAFKA-15303: Avoid unnecessary re-serialization in FK-join

2023-09-07 Thread via GitHub


mjsax commented on PR #14157:
URL: https://github.com/apache/kafka/pull/14157#issuecomment-1710947305

   \cc @cadonna @ableegoldman @lucasbru -- anyone has some spare cycles to 
review so we can make progress?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case

2023-09-07 Thread via GitHub


philipnee commented on PR #14313:
URL: https://github.com/apache/kafka/pull/14313#issuecomment-1710936514

   @C0urante - Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #14357: KAFKA-15276: Implement partition assignment reconciliation

2023-09-07 Thread via GitHub


kirktrue opened a new pull request, #14357:
URL: https://github.com/apache/kafka/pull/14357

   WIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #14356: MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager

2023-09-07 Thread via GitHub


cmccabe opened a new pull request, #14356:
URL: https://github.com/apache/kafka/pull/14356

   No code changes, just renames.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14273) Kafka doesn't start with KRaft on Windows

2023-09-07 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762907#comment-17762907
 ] 

Satish Duggana commented on KAFKA-14273:


https://github.com/apache/kafka/pull/14354


> Kafka doesn't start with KRaft on Windows
> -
>
> Key: KAFKA-14273
> URL: https://issues.apache.org/jira/browse/KAFKA-14273
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Kedar Joshi
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> {{Basic setup doesn't work on Windows 10.}}
> *{{Steps}}*
>  * {{Initialize cluster with -}}
> {code:sh}
>     bin\windows\kafka-storage.bat random-uuid
>     bin\windows\kafka-storage.bat format -t %cluster_id% -c 
> .\config\kraft\server.properties{code}
>  
>  * Start Kafka with -
> {code:sh}
>    bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code}
>  
> *Stacktrace*
> Kafka fails to start with following exception -
> {code:java}
> D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat 
> .\config\kraft\server.properties
> [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-10-03 23:14:20,375] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading 
> producer state till offset 0 with message format version 2 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from 
> producer snapshot and rebuilding producer state from offset 0 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state 
> recovery took 0ms for snapshot load and 0ms for segment recovery from offset 
> 0 (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() 
> from 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 
> (kafka.raft.KafkaMetadataLog$)
> [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.UncheckedIOException: Error while writing the Quorum status from the 
> file 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
>         at 
> org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
>         at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
>         at 
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
>         at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200)
>         at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>         at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83)
>         at kafka.Kafka$.buildServer(Kafka.scala:79)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp
>  -> 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
>         ... 10 more
>         Suppressed: java.nio.file.FileSystemException: 
> 

[jira] [Commented] (KAFKA-14273) Kafka doesn't start with KRaft on Windows

2023-09-07 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762908#comment-17762908
 ] 

Satish Duggana commented on KAFKA-14273:


Thanks [~jsancio] for the quick fix.

> Kafka doesn't start with KRaft on Windows
> -
>
> Key: KAFKA-14273
> URL: https://issues.apache.org/jira/browse/KAFKA-14273
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Kedar Joshi
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> {{Basic setup doesn't work on Windows 10.}}
> *{{Steps}}*
>  * {{Initialize cluster with -}}
> {code:sh}
>     bin\windows\kafka-storage.bat random-uuid
>     bin\windows\kafka-storage.bat format -t %cluster_id% -c 
> .\config\kraft\server.properties{code}
>  
>  * Start Kafka with -
> {code:sh}
>    bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code}
>  
> *Stacktrace*
> Kafka fails to start with following exception -
> {code:java}
> D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat 
> .\config\kraft\server.properties
> [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-10-03 23:14:20,375] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading 
> producer state till offset 0 with message format version 2 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from 
> producer snapshot and rebuilding producer state from offset 0 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state 
> recovery took 0ms for snapshot load and 0ms for segment recovery from offset 
> 0 (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() 
> from 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 
> (kafka.raft.KafkaMetadataLog$)
> [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.UncheckedIOException: Error while writing the Quorum status from the 
> file 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
>         at 
> org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
>         at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
>         at 
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
>         at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200)
>         at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>         at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83)
>         at kafka.Kafka$.buildServer(Kafka.scala:79)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp
>  -> 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
>         ... 10 more
>         Suppressed: java.nio.file.FileSystemException: 
> D:\Loca

[jira] [Updated] (KAFKA-15416) Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound

2023-09-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15416:
--
Fix Version/s: 3.6.0

> Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound
> --
>
> Key: KAFKA-15416
> URL: https://issues.apache.org/jira/browse/KAFKA-15416
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.6.0, 3.7.0
>
>
> This test fails frequently when I run unit tests locally, but I've never seen 
> it fail during a CI build.
> Failure message:
> {quote}    org.apache.kafka.connect.errors.ConnectException: Failed to list 
> offsets for topic partitions.
>         at 
> app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:777)
>         at 
> app//org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)
>  
>         Caused by:
>         org.apache.kafka.connect.errors.ConnectException: Fail to list 
> offsets for topic partitions after 1 attempts.  Reason: Timed out while 
> waiting to get end offsets for topic 'myTopic' on brokers at 
> \{retry.backoff.ms=0}
>             at 
> app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:106)
>             at 
> app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)
>             at 
> app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)
>             ... 1 more
>  
>             Caused by:
>             org.apache.kafka.common.errors.TimeoutException: Timed out while 
> waiting to get end offsets for topic 'myTopic' on brokers at 
> \{retry.backoff.ms=0}
>  
>                 Caused by:
>                 java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send 
> the call. Call: listOffsets(api=METADATA)
>                     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>                     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>                     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:716)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.lambda$retryEndOffsets$7(TopicAdmin.java:769)
>                     at 
> org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:87)
>                     at 
> org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)
>                     at 
> org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)
>  
>                     Caused by:
>                     org.apache.kafka.common.errors.TimeoutException: Timed 
> out waiting to send the call. Call: listOffsets(api=METADATA)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14273) Kafka doesn't start with KRaft on Windows

2023-09-07 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-14273.

Resolution: Fixed

> Kafka doesn't start with KRaft on Windows
> -
>
> Key: KAFKA-14273
> URL: https://issues.apache.org/jira/browse/KAFKA-14273
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Kedar Joshi
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> {{Basic setup doesn't work on Windows 10.}}
> *{{Steps}}*
>  * {{Initialize cluster with -}}
> {code:sh}
>     bin\windows\kafka-storage.bat random-uuid
>     bin\windows\kafka-storage.bat format -t %cluster_id% -c 
> .\config\kraft\server.properties{code}
>  
>  * Start Kafka with -
> {code:sh}
>    bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code}
>  
> *Stacktrace*
> Kafka fails to start with following exception -
> {code:java}
> D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat 
> .\config\kraft\server.properties
> [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-10-03 23:14:20,375] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading 
> producer state till offset 0 with message format version 2 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from 
> producer snapshot and rebuilding producer state from offset 0 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state 
> recovery took 0ms for snapshot load and 0ms for segment recovery from offset 
> 0 (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() 
> from 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 
> (kafka.raft.KafkaMetadataLog$)
> [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.UncheckedIOException: Error while writing the Quorum status from the 
> file 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
>         at 
> org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
>         at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
>         at 
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
>         at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200)
>         at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>         at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83)
>         at kafka.Kafka$.buildServer(Kafka.scala:79)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp
>  -> 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
>         ... 10 more
>         Suppressed: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-l

[jira] [Resolved] (KAFKA-15416) Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound

2023-09-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15416.
---
Fix Version/s: 3.7.0
   Resolution: Fixed

> Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound
> --
>
> Key: KAFKA-15416
> URL: https://issues.apache.org/jira/browse/KAFKA-15416
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.7.0
>
>
> This test fails frequently when I run unit tests locally, but I've never seen 
> it fail during a CI build.
> Failure message:
> {quote}    org.apache.kafka.connect.errors.ConnectException: Failed to list 
> offsets for topic partitions.
>         at 
> app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:777)
>         at 
> app//org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)
>  
>         Caused by:
>         org.apache.kafka.connect.errors.ConnectException: Fail to list 
> offsets for topic partitions after 1 attempts.  Reason: Timed out while 
> waiting to get end offsets for topic 'myTopic' on brokers at 
> \{retry.backoff.ms=0}
>             at 
> app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:106)
>             at 
> app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)
>             at 
> app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)
>             ... 1 more
>  
>             Caused by:
>             org.apache.kafka.common.errors.TimeoutException: Timed out while 
> waiting to get end offsets for topic 'myTopic' on brokers at 
> \{retry.backoff.ms=0}
>  
>                 Caused by:
>                 java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send 
> the call. Call: listOffsets(api=METADATA)
>                     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>                     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>                     at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:716)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.lambda$retryEndOffsets$7(TopicAdmin.java:769)
>                     at 
> org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:87)
>                     at 
> org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56)
>                     at 
> org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768)
>                     at 
> org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570)
>  
>                     Caused by:
>                     org.apache.kafka.common.errors.TimeoutException: Timed 
> out waiting to send the call. Call: listOffsets(api=METADATA)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case

2023-09-07 Thread via GitHub


C0urante merged PR #14313:
URL: https://github.com/apache/kafka/pull/14313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case

2023-09-07 Thread via GitHub


C0urante commented on PR #14313:
URL: https://github.com/apache/kafka/pull/14313#issuecomment-1710880812

   Ah, that makes more sense. Yeah, I've seen some of those too, would be nice 
to patch them. Thanks for your help on this one @philipnee!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio closed pull request #12763: KAFKA-14273: Kafka doesn't start with KRaft on Windows

2023-09-07 Thread via GitHub


jsancio closed pull request #12763: KAFKA-14273: Kafka doesn't start with KRaft 
on Windows
URL: https://github.com/apache/kafka/pull/12763


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #14354: KAFKA-14273; Close file before atomic move

2023-09-07 Thread via GitHub


jsancio merged PR #14354:
URL: https://github.com/apache/kafka/pull/14354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #14354: KAFKA-14273; Close file before atomic move

2023-09-07 Thread via GitHub


jsancio commented on code in PR #14354:
URL: https://github.com/apache/kafka/pull/14354#discussion_r1319056404


##
raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java:
##
@@ -144,21 +144,29 @@ private void writeElectionStateToFile(final File 
stateFile, QuorumStateData stat
 
 log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
 
-try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
- final BufferedWriter writer = new BufferedWriter(
- new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
-short version = state.highestSupportedVersion();
-
-ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, version);
-jsonState.set(DATA_VERSION, new ShortNode(version));
-writer.write(jsonState.toString());
-writer.flush();
-fileOutputStream.getFD().sync();
+try {
+try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
+ final BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8)
+ )
+) {
+short version = state.highestSupportedVersion();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #14305: KAFKA-14274: [1/7] basic refactoring

2023-09-07 Thread via GitHub


junrao merged PR #14305:
URL: https://github.com/apache/kafka/pull/14305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-07 Thread via GitHub


cmccabe merged PR #14306:
URL: https://github.com/apache/kafka/pull/14306


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-07 Thread via GitHub


cmccabe commented on code in PR #14306:
URL: https://github.com/apache/kafka/pull/14306#discussion_r1319165974


##
clients/src/main/resources/common/message/ControllerRegistrationRequest.json:
##
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 70,

Review Comment:
   edit: now using 70 again since someone claimed 69 :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-07 Thread via GitHub


C0urante commented on PR #14314:
URL: https://github.com/apache/kafka/pull/14314#issuecomment-1710828993

   Sorry! Yes, that's correct--3.6 is all we need to backport to. Thanks 
@jolshan!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14305: KAFKA-14274: [1/7] basic refactoring

2023-09-07 Thread via GitHub


kirktrue commented on code in PR #14305:
URL: https://github.com/apache/kafka/pull/14305#discussion_r1319162018


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -639,9 +639,9 @@ private void maybeOverrideClientId(Map 
configs) {
 }
 }
 
-protected static Map 
appendDeserializerToConfig(Map configs,
-
Deserializer keyDeserializer,
-
Deserializer valueDeserializer) {
+public static Map appendDeserializerToConfig(Map configs,

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on pull request #14305: KAFKA-14274: [1/7] basic refactoring

2023-09-07 Thread via GitHub


kirktrue commented on PR #14305:
URL: https://github.com/apache/kafka/pull/14305#issuecomment-1710825657

   Test failures in latest build are unrelated:
   
   ```
   
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoGroupAcl()
   kafka.api.TransactionsTest.testBumpTransactionalEpoch()
   kafka.api.TransactionsTest.testCommitTransactionTimeout()
   
kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart()
   
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations()
   kafka.server.ProduceRequestTest.testSimpleProduceRequest()
   
o.a.k.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupSentExternalCompletion()
   
o.a.k.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector()
   o.a.k.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan merged pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-07 Thread via GitHub


jolshan merged PR #14314:
URL: https://github.com/apache/kafka/pull/14314


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-07 Thread via GitHub


jolshan commented on PR #14314:
URL: https://github.com/apache/kafka/pull/14314#issuecomment-1710766397

   Tests look unrelated and like flakes I've seen before, so I will go ahead 
and merge + pick to 3.6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on pull request #14345: MINOR: Range assignor changes

2023-09-07 Thread via GitHub


rreddy-22 commented on PR #14345:
URL: https://github.com/apache/kafka/pull/14345#issuecomment-1710765019

   > 
   
   hey! @vamossagar12 , the other assignors are still under development, this 
is just an open PR in case we find other changes we would like to make so that 
all three have the same format and naming conventions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


lianetm commented on PR #14346:
URL: https://github.com/apache/kafka/pull/14346#issuecomment-1710762508

   Thanks a lot @philipnee for the review, all comments addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1319111762


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -209,6 +215,34 @@ public ConsumerRecords poll(final Duration timeout) {
 return ConsumerRecords.empty();
 }
 
+/**
+ * Set the fetch position to the committed position (if there is one) or 
reset it using the
+ * offset reset policy the user has configured.
+ *
+ * @return true if the operation completed without timing out
+ * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+ * @throws NoOffsetForPartitionException  If no 
offset is stored for a given partition and no offset reset policy is
+ *defined
+ */
+private boolean updateFetchPositions() {
+// If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
+eventHandler.add(validatePositionsEvent);
+
+// TODO: integrate logic for refreshing committed offsets if available

Review Comment:
   For the record, even though the change for using committed offsets here is 2 
lines of code replacing this TODO, it requires also a minor refactoring on 
existing core components (ex. `ConsumerCoordinator`). So it will all follow 
this, on a PR like "Adding support for using committed offsets when updating 
fetch positions"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319107403


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1),
-new TopicPartition(topics(2), 0),
-new TopicPartition(topics(2), 1),
-new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "gro

[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1319093808


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -298,6 +356,154 @@ private CompletableFuture 
buildListOffsetRequestToNode(
 return result;
 }
 
+/**
+ * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+ * partitions.
+ * Use the retrieved offsets to reset positions in the subscription state.
+ *
+ * @param timestampsToSearch the mapping between partitions and target time
+ * @return A list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+ * that can be polled to obtain the corresponding timestamps and offsets.
+ */
+private List 
buildListOffsetsRequestsAndResetPositions(
+final Map timestampsToSearch) {
+Map> timestampsToSearchByNode =
+groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+final List unsentRequests = new 
ArrayList<>();
+
+timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+time.milliseconds() + requestTimeoutMs);
+
+CompletableFuture partialResult = 
buildListOffsetRequestToNode(
+node,
+resetTimestamps,
+false,
+unsentRequests);
+
+partialResult.whenComplete((result, error) -> {
+if (error == null) {
+
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps,
+result);
+} else {
+RuntimeException e;
+if (error instanceof RuntimeException) {
+e = (RuntimeException) error;
+} else {
+e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+"resetting positions", error);
+}
+
offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e);
+}
+});
+});
+return unsentRequests;
+}
+
+/**
+ * For each partition that needs validation, make an asynchronous request 
to get the end-offsets
+ * for the partition with the epoch less than or equal to the epoch the 
partition last saw.
+ * 
+ * Requests are grouped by Node for efficiency.
+ */
+private List 
buildListOffsetsRequestsAndValidatePositions(
+Map 
partitionsToValidate) {
+
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
+final List unsentRequests = new 
ArrayList<>();
+regrouped.forEach((node, fetchPositions) -> {
+
+if (node.isEmpty()) {

Review Comment:
   Yes, it's needed, these are 2 different things. The `if` is checking the 
Node object `isEmpty` function, but the filter is only removing the entries for 
which  the leader (`Optional`) is not present. 
   
   So we could have an Optional present, but returning a Node for which 
`isEmpty` is true (used mainly in case of errors)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


rreddy-22 commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319088896


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1),
-new TopicPartition(topics(2), 0),
-new TopicPartition(topics(2), 1),
-new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "g

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


rreddy-22 commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319088896


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1),
-new TopicPartition(topics(2), 0),
-new TopicPartition(topics(2), 1),
-new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "g

[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319083998


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1),
-new TopicPartition(topics(2), 0),
-new TopicPartition(topics(2), 1),
-new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "gro

[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319082640


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1),
-new TopicPartition(topics(2), 0),
-new TopicPartition(topics(2), 1),
-new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "gro

[jira] [Updated] (KAFKA-14273) Kafka doesn't start with KRaft on Windows

2023-09-07 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-14273:
---
Fix Version/s: 3.6.0

> Kafka doesn't start with KRaft on Windows
> -
>
> Key: KAFKA-14273
> URL: https://issues.apache.org/jira/browse/KAFKA-14273
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Kedar Joshi
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.6.0
>
>
> {{Basic setup doesn't work on Windows 10.}}
> *{{Steps}}*
>  * {{Initialize cluster with -}}
> {code:sh}
>     bin\windows\kafka-storage.bat random-uuid
>     bin\windows\kafka-storage.bat format -t %cluster_id% -c 
> .\config\kraft\server.properties{code}
>  
>  * Start Kafka with -
> {code:sh}
>    bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code}
>  
> *Stacktrace*
> Kafka fails to start with following exception -
> {code:java}
> D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat 
> .\config\kraft\server.properties
> [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-10-03 23:14:20,375] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading 
> producer state till offset 0 with message format version 2 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from 
> producer snapshot and rebuilding producer state from offset 0 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state 
> recovery took 0ms for snapshot load and 0ms for segment recovery from offset 
> 0 (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() 
> from 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 
> (kafka.raft.KafkaMetadataLog$)
> [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.UncheckedIOException: Error while writing the Quorum status from the 
> file 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
>         at 
> org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
>         at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
>         at 
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
>         at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200)
>         at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>         at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83)
>         at kafka.Kafka$.buildServer(Kafka.scala:79)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp
>  -> 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
>         ... 10 more
>         Suppressed: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-

[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1319081696


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -209,6 +215,34 @@ public ConsumerRecords poll(final Duration timeout) {
 return ConsumerRecords.empty();
 }
 
+/**
+ * Set the fetch position to the committed position (if there is one) or 
reset it using the
+ * offset reset policy the user has configured.
+ *
+ * @return true if the operation completed without timing out
+ * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+ * @throws NoOffsetForPartitionException  If no 
offset is stored for a given partition and no offset reset policy is
+ *defined
+ */
+private boolean updateFetchPositions() {
+// If any partitions have been truncated due to a leader change, we 
need to validate the offsets
+ValidatePositionsApplicationEvent validatePositionsEvent = new 
ValidatePositionsApplicationEvent();
+eventHandler.add(validatePositionsEvent);
+
+// TODO: integrate logic for refreshing committed offsets if available

Review Comment:
   That's already implemented (in the integration branch), just included in a 
different PR that should come after this, following the merge plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


rreddy-22 commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319078806


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -14,233 +14,532 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package kafka.server
 
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
-import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, 
OffsetFetchResponse}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.apache.kafka.common.requests.{OffsetFetchRequest, 
OffsetFetchResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
 
-import java.util
-import java.util.Collections.singletonList
+import java.util.Comparator
+import java.util.stream.Collectors
 import scala.jdk.CollectionConverters._
-import java.util.{Collections, Optional, Properties}
-
-class OffsetFetchRequestTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 1
-
-  val brokerId: Integer = 0
-  val offset = 15L
-  val leaderEpoch: Optional[Integer] = Optional.of(3)
-  val metadata = "metadata"
-  val topic = "topic"
-  val groupId = "groupId"
-  val groups: Seq[String] = (1 to 5).map(i => s"group$i")
-  val topics: Seq[String] = (1 to 3).map(i => s"topic$i")
-  val topic1List = singletonList(new TopicPartition(topics(0), 0))
-  val topic1And2List = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1))
-  val allTopicsList = util.Arrays.asList(
-new TopicPartition(topics(0), 0),
-new TopicPartition(topics(1), 0),
-new TopicPartition(topics(1), 1),
-new TopicPartition(topics(2), 0),
-new TopicPartition(topics(2), 1),
-new TopicPartition(topics(2), 2))
-  val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] =
-new util.HashMap[String, util.List[TopicPartition]]()
-  groupToPartitionMap.put(groups(0), topic1List)
-  groupToPartitionMap.put(groups(1), topic1And2List)
-  groupToPartitionMap.put(groups(2), allTopicsList)
-  groupToPartitionMap.put(groups(3), null)
-  groupToPartitionMap.put(groups(4), null)
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
-properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
-properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
-properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
-properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetFetchRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def 
testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true)
   }
 
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-doSetup(testInfo, createOffsetsTopic = false)
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "g

[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319078377


##
core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala:
##
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetCommitRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),
+new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "60"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testOffsetCommit(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+testOffsetCommit(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+testOffsetCommit(false)
+  }
+
+  private def testOffsetCommit(useNewProtocol: Boolean): Unit = {
+if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+  fail("Cannot use the new protocol with the old group coordinator.")
+}
+
+val admin = cluster.createAdminClient()
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+TestUtils.createOffsetsTopicWithAdmin(
+  admin = admin,
+  brokers = if (cluster.isKRaftTest) {
+
cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala
+  } else {
+
cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala
+  }
+)
+
+// Create the topic.
+TestUtils.createTopicWithAdminRaw(
+  admin = admin,
+  topic = "foo",
+  numPartitions = 3
+)
+
+// Join the consumer group.
+val (memberId, memberEpoch) = if (useNewProtocol) {
+  // Note that we heartbeat only once to join the group and assume
+  // that the test will complete within the session timeout.
+  joinConsumerGroupWithNewProtocol("grp")
+} else {
+  // Note that we don't heartbeat and assume  that the tes

[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1319078266


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -268,7 +326,7 @@ private CompletableFuture 
buildListOffsetRequestToNode(
 .forConsumer(requireTimestamps, isolationLevel, false)
 
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
 
-log.debug("Creating ListOffsetRequest {} for broker {} to reset 
positions", builder,

Review Comment:
   I'm trying to use the proper API key for that request, as defined in the 
Kafka protocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14323:
URL: https://github.com/apache/kafka/pull/14323#discussion_r1319075024


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * 
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+private final String groupId;
+private Optional groupInstanceId;
+private String memberId;
+private int memberEpoch;
+private MemberState state;
+private AssignorSelection assignorSelection;
+
+/**
+ * Assignment that the member received from the server and successfully 
processed
+ */
+private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
+/**
+ * List of assignments that the member received from the server but hasn't 
processed yet
+ */
+private final List 
targetAssignments;
+
+public MembershipManagerImpl(String groupId) {
+this.groupId = groupId;
+this.state = MemberState.UNJOINED;
+this.assignorSelection = AssignorSelection.defaultAssignor();
+this.targetAssignments = new ArrayList<>();
+}
+
+public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+this(groupId);
+this.groupInstanceId = Optional.ofNullable(groupInstanceId);
+setAssignorSelection(assignorSelection);
+}
+
+/**
+ * Update assignor selection for the member.
+ *
+ * @param assignorSelection New assignor selection
+ * @throws IllegalArgumentException If the provided assignor selection is 
null
+ */
+public void setAssignorSelection(AssignorSelection assignorSelection) {
+if (assignorSelection == null) {
+throw new IllegalArgumentException("Assignor selection cannot be 
null");
+}
+this.assignorSelection = assignorSelection;
+}
+
+private void transitionTo(MemberState nextState) {
+if (!nextState.getPreviousValidStates().contains(state)) {
+// TODO: handle invalid state transition
+throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+state, nextState));
+}
+this.state = nextState;
+}
+
+@Override
+public String groupId() {
+return groupId;
+}
+
+@Override
+public String groupInstanceId() {
+// TODO: review empty vs null instance id
+return groupInstanceId.orElse(null);
+}
+
+@Override
+public String memberId() {
+return memberId;
+}
+
+@Override
+public int memberEpoch() {
+return memberEpoch;
+}
+
+@Override
+public void updateState(ConsumerGroupHeartbeatResponseData response) {
+if (response.errorCode() == Errors.NONE.code()) {
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+targetAssignments.add(response.assignment());
+transitionTo(MemberState.PROCESSING_ASSIGNMENT);
+} else {
+if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
+resetMemberIdAndEpoch();
+transitionTo(MemberState.UNJOINED);
+} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
+transitionTo(MemberState.FAILED);
+}
+}
+}
+
+public void 
onAs

[GitHub] [kafka] lianetm commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14323:
URL: https://github.com/apache/kafka/pull/14323#discussion_r1319075024


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * 
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+private final String groupId;
+private Optional groupInstanceId;
+private String memberId;
+private int memberEpoch;
+private MemberState state;
+private AssignorSelection assignorSelection;
+
+/**
+ * Assignment that the member received from the server and successfully 
processed
+ */
+private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
+/**
+ * List of assignments that the member received from the server but hasn't 
processed yet
+ */
+private final List 
targetAssignments;
+
+public MembershipManagerImpl(String groupId) {
+this.groupId = groupId;
+this.state = MemberState.UNJOINED;
+this.assignorSelection = AssignorSelection.defaultAssignor();
+this.targetAssignments = new ArrayList<>();
+}
+
+public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+this(groupId);
+this.groupInstanceId = Optional.ofNullable(groupInstanceId);
+setAssignorSelection(assignorSelection);
+}
+
+/**
+ * Update assignor selection for the member.
+ *
+ * @param assignorSelection New assignor selection
+ * @throws IllegalArgumentException If the provided assignor selection is 
null
+ */
+public void setAssignorSelection(AssignorSelection assignorSelection) {
+if (assignorSelection == null) {
+throw new IllegalArgumentException("Assignor selection cannot be 
null");
+}
+this.assignorSelection = assignorSelection;
+}
+
+private void transitionTo(MemberState nextState) {
+if (!nextState.getPreviousValidStates().contains(state)) {
+// TODO: handle invalid state transition
+throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+state, nextState));
+}
+this.state = nextState;
+}
+
+@Override
+public String groupId() {
+return groupId;
+}
+
+@Override
+public String groupInstanceId() {
+// TODO: review empty vs null instance id
+return groupInstanceId.orElse(null);
+}
+
+@Override
+public String memberId() {
+return memberId;
+}
+
+@Override
+public int memberEpoch() {
+return memberEpoch;
+}
+
+@Override
+public void updateState(ConsumerGroupHeartbeatResponseData response) {
+if (response.errorCode() == Errors.NONE.code()) {
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+targetAssignments.add(response.assignment());
+transitionTo(MemberState.PROCESSING_ASSIGNMENT);
+} else {
+if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
+resetMemberIdAndEpoch();
+transitionTo(MemberState.UNJOINED);
+} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
+transitionTo(MemberState.FAILED);
+}
+}
+}
+
+public void 
onAs

[jira] [Assigned] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15417:
---

Assignee: Victor van den Hoven

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762863#comment-17762863
 ] 

Matthias J. Sax commented on KAFKA-15417:
-

Thanks a lot! Assigned the ticket to you.

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax merged pull request #14341: KAFKA-15307: Removes non-existent configs

2023-09-07 Thread via GitHub


mjsax merged PR #14341:
URL: https://github.com/apache/kafka/pull/14341


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #14355: KAFKA-14595 ReassignPartitionsUnitTest rewritten in java

2023-09-07 Thread via GitHub


nizhikov opened a new pull request, #14355:
URL: https://github.com/apache/kafka/pull/14355

   This PR is part of #13247 
   It contains changes to rewrite single test in java.
   Intention is reduce changes in parent PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14354: KAFKA-14273; Close file before atomic move

2023-09-07 Thread via GitHub


cmccabe commented on code in PR #14354:
URL: https://github.com/apache/kafka/pull/14354#discussion_r1319034514


##
raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java:
##
@@ -144,21 +144,29 @@ private void writeElectionStateToFile(final File 
stateFile, QuorumStateData stat
 
 log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
 
-try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
- final BufferedWriter writer = new BufferedWriter(
- new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
-short version = state.highestSupportedVersion();
-
-ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, version);
-jsonState.set(DATA_VERSION, new ShortNode(version));
-writer.write(jsonState.toString());
-writer.flush();
-fileOutputStream.getFD().sync();
+try {
+try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
+ final BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8)
+ )
+) {
+short version = state.highestSupportedVersion();

Review Comment:
   while we're fixing this let's get rid of `highestSupportedVersion` here. Be 
clear about the version we're setting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions

2023-09-07 Thread via GitHub


philipnee commented on code in PR #14323:
URL: https://github.com/apache/kafka/pull/14323#discussion_r1319012270


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * 
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+private final String groupId;
+private Optional groupInstanceId;
+private String memberId;
+private int memberEpoch;
+private MemberState state;
+private AssignorSelection assignorSelection;
+
+/**
+ * Assignment that the member received from the server and successfully 
processed
+ */
+private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
+/**
+ * List of assignments that the member received from the server but hasn't 
processed yet
+ */
+private final List 
targetAssignments;
+
+public MembershipManagerImpl(String groupId) {
+this.groupId = groupId;
+this.state = MemberState.UNJOINED;
+this.assignorSelection = AssignorSelection.defaultAssignor();
+this.targetAssignments = new ArrayList<>();
+}
+
+public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+this(groupId);
+this.groupInstanceId = Optional.ofNullable(groupInstanceId);
+setAssignorSelection(assignorSelection);
+}
+
+/**
+ * Update assignor selection for the member.
+ *
+ * @param assignorSelection New assignor selection
+ * @throws IllegalArgumentException If the provided assignor selection is 
null
+ */
+public void setAssignorSelection(AssignorSelection assignorSelection) {
+if (assignorSelection == null) {
+throw new IllegalArgumentException("Assignor selection cannot be 
null");
+}
+this.assignorSelection = assignorSelection;
+}
+
+private void transitionTo(MemberState nextState) {
+if (!nextState.getPreviousValidStates().contains(state)) {
+// TODO: handle invalid state transition
+throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+state, nextState));
+}
+this.state = nextState;
+}
+
+@Override
+public String groupId() {
+return groupId;
+}
+
+@Override
+public String groupInstanceId() {
+// TODO: review empty vs null instance id
+return groupInstanceId.orElse(null);
+}
+
+@Override
+public String memberId() {
+return memberId;
+}
+
+@Override
+public int memberEpoch() {
+return memberEpoch;
+}
+
+@Override
+public void updateState(ConsumerGroupHeartbeatResponseData response) {
+if (response.errorCode() == Errors.NONE.code()) {
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+targetAssignments.add(response.assignment());
+transitionTo(MemberState.PROCESSING_ASSIGNMENT);
+} else {
+if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
+resetMemberIdAndEpoch();
+transitionTo(MemberState.UNJOINED);
+} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
+transitionTo(MemberState.FAILED);
+}
+}
+}
+
+public void 
on

[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319008280


##
core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala:
##
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance
+import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.util.stream.Collectors
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetCommitRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "60"),

Review Comment:
   Is this the current way we enable setting new consumer group protocol? 
Setting these two configs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14353:
URL: https://github.com/apache/kafka/pull/14353#discussion_r1319005551


##
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, 
SyncGroupRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, 
JoinGroupRequest, JoinGroupResponse, OffsetCommitRequest, OffsetCommitResponse, 
SyncGroupRequest, SyncGroupResponse}
+import org.junit.jupiter.api.Assertions.assertEquals
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+
+class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {

Review Comment:
   Is the goal here to create a new baseRequestTest class? I noticed we didn't 
extend the existing BaseRequestTest class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions

2023-09-07 Thread via GitHub


lianetm commented on code in PR #14323:
URL: https://github.com/apache/kafka/pull/14323#discussion_r1319005335


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Membership manager that maintains group membership for a single member 
following the new
+ * consumer group protocol.
+ * 
+ * This keeps membership state and assignment updated in-memory, based on the 
heartbeat responses
+ * the member receives. It is also responsible for computing assignment for 
the group based on
+ * the metadata, if the member has been selected by the broker to do so.
+ */
+public class MembershipManagerImpl implements MembershipManager {
+
+private final String groupId;
+private Optional groupInstanceId;
+private String memberId;
+private int memberEpoch;
+private MemberState state;
+private AssignorSelection assignorSelection;
+
+/**
+ * Assignment that the member received from the server and successfully 
processed
+ */
+private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+
+/**
+ * List of assignments that the member received from the server but hasn't 
processed yet
+ */
+private final List 
targetAssignments;
+
+public MembershipManagerImpl(String groupId) {
+this.groupId = groupId;
+this.state = MemberState.UNJOINED;
+this.assignorSelection = AssignorSelection.defaultAssignor();
+this.targetAssignments = new ArrayList<>();
+}
+
+public MembershipManagerImpl(String groupId, String groupInstanceId, 
AssignorSelection assignorSelection) {
+this(groupId);
+this.groupInstanceId = Optional.ofNullable(groupInstanceId);
+setAssignorSelection(assignorSelection);
+}
+
+/**
+ * Update assignor selection for the member.
+ *
+ * @param assignorSelection New assignor selection
+ * @throws IllegalArgumentException If the provided assignor selection is 
null
+ */
+public void setAssignorSelection(AssignorSelection assignorSelection) {
+if (assignorSelection == null) {
+throw new IllegalArgumentException("Assignor selection cannot be 
null");
+}
+this.assignorSelection = assignorSelection;
+}
+
+private void transitionTo(MemberState nextState) {
+if (!nextState.getPreviousValidStates().contains(state)) {
+// TODO: handle invalid state transition
+throw new RuntimeException(String.format("Invalid state transition 
from %s to %s",
+state, nextState));
+}
+this.state = nextState;
+}
+
+@Override
+public String groupId() {
+return groupId;
+}
+
+@Override
+public String groupInstanceId() {
+// TODO: review empty vs null instance id
+return groupInstanceId.orElse(null);
+}
+
+@Override
+public String memberId() {
+return memberId;
+}
+
+@Override
+public int memberEpoch() {
+return memberEpoch;
+}
+
+@Override
+public void updateState(ConsumerGroupHeartbeatResponseData response) {
+if (response.errorCode() == Errors.NONE.code()) {
+this.memberId = response.memberId();
+this.memberEpoch = response.memberEpoch();
+targetAssignments.add(response.assignment());
+transitionTo(MemberState.PROCESSING_ASSIGNMENT);
+} else {
+if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || 
response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) {
+resetMemberIdAndEpoch();
+transitionTo(MemberState.UNJOINED);
+} else if (response.errorCode() == 
Errors.UNRELEASED_INSTANCE_ID.code()) {
+transitionTo(MemberState.FAILED);
+}
+}
+}
+
+public void 
onAs

[GitHub] [kafka] jsancio commented on a diff in pull request #12763: KAFKA-14273: Kafka doesn't start with KRaft on Windows

2023-09-07 Thread via GitHub


jsancio commented on code in PR #12763:
URL: https://github.com/apache/kafka/pull/12763#discussion_r1319003199


##
raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java:
##
@@ -149,6 +149,7 @@ private void writeElectionStateToFile(final File stateFile, 
QuorumStateData stat
 writer.write(jsonState.toString());
 writer.flush();
 fileOutputStream.getFD().sync();
+fileOutputStream.close();

Review Comment:
   I created this PR which includes this suggestion: 
https://github.com/apache/kafka/pull/14354
   
   cc @dengziming @showuon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #14354: KAFKA-14273; Close file before atomic move

2023-09-07 Thread via GitHub


jsancio opened a new pull request, #14354:
URL: https://github.com/apache/kafka/pull/14354

   In the Windows OS atomic move are not allowed if the file has an open 
handle. E.g
   
   __cluster_metadata-0\quorum-state: The process cannot access the file 
because it is being used by another process
   at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
   at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
   at 
java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
   at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
   at java.base/java.nio.file.Files.move(Files.java:1430)
   at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
   at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
   at 
org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
   
   This is fixed by first closing the temporary quorum-state file before 
attempting to move it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions

2023-09-07 Thread via GitHub


lianetm commented on PR #14323:
URL: https://github.com/apache/kafka/pull/14323#issuecomment-1710622597

   Build completed with 6 unrelated test failures:
   
   Build / JDK 20 and Scala 2.13 / 
testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
   Build / JDK 8 and Scala 2.12 / 
testNonDefaultConnectionCountLimitAndRateLimit() – 
kafka.network.ConnectionQuotasTest
   Build / JDK 11 and Scala 2.13 / 
testDelegationTokenRequests(String).quorum=kraft – 
kafka.server.DelegationTokenRequestsTest
   Build / JDK 11 and Scala 2.13 / 
executeTieredStorageTest(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.ReassignReplicaExpandTest
   Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor() – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   Build / JDK 17 and Scala 2.13 / 
testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


philipnee commented on code in PR #14346:
URL: https://github.com/apache/kafka/pull/14346#discussion_r1318977376


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -268,7 +326,7 @@ private CompletableFuture 
buildListOffsetRequestToNode(
 .forConsumer(requireTimestamps, isolationLevel, false)
 
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes));
 
-log.debug("Creating ListOffsetRequest {} for broker {} to reset 
positions", builder,

Review Comment:
   I really think ListOffsetRequest is fine.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions(
 log.error("Discarding error in ListOffsetResponse because another 
error is pending", error);
 }
 
+
+void onSuccessfulRequestForValidatingPositions(

Review Comment:
   it should be onSuccessfulResponse ? As well as onFailedResponse.  Maybe 
`onSuccessfulValidatePositionResponse`



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -298,6 +356,154 @@ private CompletableFuture 
buildListOffsetRequestToNode(
 return result;
 }
 
+/**
+ * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+ * partitions.
+ * Use the retrieved offsets to reset positions in the subscription state.
+ *
+ * @param timestampsToSearch the mapping between partitions and target time
+ * @return A list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+ * that can be polled to obtain the corresponding timestamps and offsets.
+ */
+private List 
buildListOffsetsRequestsAndResetPositions(
+final Map timestampsToSearch) {
+Map> timestampsToSearchByNode =
+groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+final List unsentRequests = new 
ArrayList<>();
+
+timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+time.milliseconds() + requestTimeoutMs);
+
+CompletableFuture partialResult = 
buildListOffsetRequestToNode(
+node,
+resetTimestamps,
+false,
+unsentRequests);
+
+partialResult.whenComplete((result, error) -> {
+if (error == null) {
+
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps,

Review Comment:
   ditto - should be onSuccessfulResponse right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -298,6 +356,154 @@ private CompletableFuture 
buildListOffsetRequestToNode(
 return result;
 }
 
+/**
+ * Make asynchronous ListOffsets request to fetch offsets by target times 
for the specified
+ * partitions.
+ * Use the retrieved offsets to reset positions in the subscription state.
+ *
+ * @param timestampsToSearch the mapping between partitions and target time
+ * @return A list of
+ * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
+ * that can be polled to obtain the corresponding timestamps and offsets.
+ */
+private List 
buildListOffsetsRequestsAndResetPositions(
+final Map timestampsToSearch) {
+Map> timestampsToSearchByNode =
+groupListOffsetRequests(timestampsToSearch, Optional.empty());
+
+final List unsentRequests = new 
ArrayList<>();
+
+timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
+subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(),
+time.milliseconds() + requestTimeoutMs);
+
+CompletableFuture partialResult = 
buildListOffsetRequestToNode(
+node,
+resetTimestamps,
+false,
+unsentRequests);
+
+partialResult.whenComplete((result, error) -> {
+if (error == null) {
+
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps,
+result);
+} else {
+RuntimeException e;
+if (error instanceof RuntimeException) {
+e = (RuntimeException) error;
+} else {
+e = new RuntimeException("Unexpected failure in 
ListOffsets request for " +
+"resetting positions", error);
+}
+
offsetFetcherUtils.onFailedRequestForResettingPositions(resetT

[GitHub] [kafka] lianetm commented on pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration

2023-09-07 Thread via GitHub


lianetm commented on PR #14346:
URL: https://github.com/apache/kafka/pull/14346#issuecomment-1710621218

   Build completed with 6 unrelated test failures:
   
   Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() 
– 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   Build / JDK 20 and Scala 2.13 / 
shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() – 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   Build / JDK 20 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   Build / JDK 8 and Scala 2.12 / 
shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() – 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest
   Build / JDK 11 and Scala 2.13 / testRackAwareRangeAssignor() – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   Build / JDK 11 and Scala 2.13 / 
testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-07 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762846#comment-17762846
 ] 

David Jacot commented on KAFKA-12473:
-

[~kirktrue] [~showuon] In my opinion, we should not do this given that KIP-848 
will replace this soon. It is not worth it.

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14273) Kafka doesn't start with KRaft on Windows

2023-09-07 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762843#comment-17762843
 ] 

José Armando García Sancio commented on KAFKA-14273:


[~satish.duggana] I think it is a blocker. I sent you an email in the release 
thread. I'll submit a PR shortly.

> Kafka doesn't start with KRaft on Windows
> -
>
> Key: KAFKA-14273
> URL: https://issues.apache.org/jira/browse/KAFKA-14273
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Kedar Joshi
>Assignee: José Armando García Sancio
>Priority: Major
>
> {{Basic setup doesn't work on Windows 10.}}
> *{{Steps}}*
>  * {{Initialize cluster with -}}
> {code:sh}
>     bin\windows\kafka-storage.bat random-uuid
>     bin\windows\kafka-storage.bat format -t %cluster_id% -c 
> .\config\kraft\server.properties{code}
>  
>  * Start Kafka with -
> {code:sh}
>    bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code}
>  
> *Stacktrace*
> Kafka fails to start with following exception -
> {code:java}
> D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat 
> .\config\kraft\server.properties
> [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-10-03 23:14:20,375] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading 
> producer state till offset 0 with message format version 2 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from 
> producer snapshot and rebuilding producer state from offset 0 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state 
> recovery took 0ms for snapshot load and 0ms for segment recovery from offset 
> 0 (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() 
> from 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 
> (kafka.raft.KafkaMetadataLog$)
> [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.UncheckedIOException: Error while writing the Quorum status from the 
> file 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
>         at 
> org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
>         at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
>         at 
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
>         at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200)
>         at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>         at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83)
>         at kafka.Kafka$.buildServer(Kafka.scala:79)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp
>  -> 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
>         ... 10 mor

[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-09-07 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762842#comment-17762842
 ] 

Kirk True commented on KAFKA-12473:
---

[~showuon] Are you working on this change actively? I'm wondering if it's 
something I can work on? Thanks!

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14273) Kafka doesn't start with KRaft on Windows

2023-09-07 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio reassigned KAFKA-14273:
--

Assignee: José Armando García Sancio

> Kafka doesn't start with KRaft on Windows
> -
>
> Key: KAFKA-14273
> URL: https://issues.apache.org/jira/browse/KAFKA-14273
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Kedar Joshi
>Assignee: José Armando García Sancio
>Priority: Major
>
> {{Basic setup doesn't work on Windows 10.}}
> *{{Steps}}*
>  * {{Initialize cluster with -}}
> {code:sh}
>     bin\windows\kafka-storage.bat random-uuid
>     bin\windows\kafka-storage.bat format -t %cluster_id% -c 
> .\config\kraft\server.properties{code}
>  
>  * Start Kafka with -
> {code:sh}
>    bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code}
>  
> *Stacktrace*
> Kafka fails to start with following exception -
> {code:java}
> D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat 
> .\config\kraft\server.properties
> [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-10-03 23:14:20,375] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading 
> producer state till offset 0 with message format version 2 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from 
> producer snapshot and rebuilding producer state from offset 0 
> (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state 
> recovery took 0ms for snapshot load and 0ms for segment recovery from offset 
> 0 (kafka.log.UnifiedLog$)
> [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() 
> from 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 
> (kafka.raft.KafkaMetadataLog$)
> [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.UncheckedIOException: Error while writing the Quorum status from the 
> file 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128)
>         at 
> org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477)
>         at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212)
>         at 
> org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369)
>         at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200)
>         at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>         at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83)
>         at kafka.Kafka$.buildServer(Kafka.scala:79)
>         at kafka.Kafka$.main(Kafka.scala:87)
>         at kafka.Kafka.main(Kafka.scala)
> Caused by: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp
>  -> 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state:
>  The process cannot access the file because it is being used by another 
> process
>         at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>         at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>         at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
>         at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
>         at java.base/java.nio.file.Files.move(Files.java:1430)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935)
>         at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918)
>         at 
> org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
>         ... 10 more
>         Suppressed: java.nio.file.FileSystemException: 
> D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_clu

[GitHub] [kafka] philipnee commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case

2023-09-07 Thread via GitHub


philipnee commented on PR #14313:
URL: https://github.com/apache/kafka/pull/14313#issuecomment-1710569868

   @C0urante - Sorry what i meant was I've seen other locally flaky tests that 
almost never show up during the jenkins build.   Thanks! I'll submit a patch 
after you merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] blacktooth commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-09-07 Thread via GitHub


blacktooth commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1318930687


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   @vamossagar12  @yashmayya Wondering if 
[RetryUtil](https://github.com/apache/kafka/blob/0029bc4897e603614a49e0b0f1e623abbe650c61/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java#L55)
 is a better alternative to use to avoid publishing these metrics. What do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] blacktooth commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException

2023-09-07 Thread via GitHub


blacktooth commented on code in PR #13726:
URL: https://github.com/apache/kafka/pull/13726#discussion_r1318930687


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -458,13 +458,8 @@ boolean sendRecords() {
 }
 
 protected List poll() throws InterruptedException {
-try {
-return task.poll();
-} catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
-log.warn("{} failed to poll records from SourceTask. Will retry 
operation.", this, e);
-// Do nothing. Let the framework poll whenever it's ready.
-return null;
-}
+retryWithToleranceOperator.reset();
+return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, 
this.getClass());

Review Comment:
   Wondering if 
[RetryUtil](https://github.com/apache/kafka/blob/0029bc4897e603614a49e0b0f1e623abbe650c61/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java#L55)
 is a better alternative to use to avoid publishing these metrics. What do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java

2023-09-07 Thread via GitHub


nizhikov commented on PR #14217:
URL: https://github.com/apache/kafka/pull/14217#issuecomment-1710540249

   @gharris1727 Thank you very much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-07 Thread via GitHub


jolshan commented on PR #14314:
URL: https://github.com/apache/kafka/pull/14314#issuecomment-1710539538

   Also -- I assume we want to pick this to 3.6 as well. Is that the only 
branch @C0urante?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java

2023-09-07 Thread via GitHub


gharris1727 commented on PR #14217:
URL: https://github.com/apache/kafka/pull/14217#issuecomment-1710524166

   Thanks @nizhikov for your patience, and thanks for keeping people notified 
about this PR. Too many PRs like this one are left stale due to limited 
committer bandwidth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14595) Move ReassignPartitionsCommand to tools

2023-09-07 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762822#comment-17762822
 ] 

Greg Harris commented on KAFKA-14595:
-

I merged [https://github.com/apache/kafka/pull/14217] but I am leaving this 
open as the other PRs have not landed yet.

> Move ReassignPartitionsCommand to tools
> ---
>
> Key: KAFKA-14595
> URL: https://issues.apache.org/jira/browse/KAFKA-14595
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Nikolay Izhikov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers

2023-09-07 Thread via GitHub


cmccabe commented on PR #14306:
URL: https://github.com/apache/kafka/pull/14306#issuecomment-1710512791

   Fixed conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 merged pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java

2023-09-07 Thread via GitHub


gharris1727 merged PR #14217:
URL: https://github.com/apache/kafka/pull/14217


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java

2023-09-07 Thread via GitHub


gharris1727 commented on PR #14217:
URL: https://github.com/apache/kafka/pull/14217#issuecomment-1710503247

   The test failures in CI appear unrelated, and `tools:test` passes for me 
locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie commented on pull request #14341: KAFKA-15307: Removes non-existent configs

2023-09-07 Thread via GitHub


Cerchie commented on PR #14341:
URL: https://github.com/apache/kafka/pull/14341#issuecomment-1710482923

   > Thanks for the PR. I know that we did remove `partition.grouper`, but the 
others seems to be valid config. For example 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L665
   > 
   > Can you double check again?
   
   huh. I wonder if was working from an un-updated fork-- double-checking. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression

2023-09-07 Thread via GitHub


Cerchie commented on code in PR #14322:
URL: https://github.com/apache/kafka/pull/14322#discussion_r1318874061


##
docs/design.html:
##
@@ -136,8 +136,10 @@ 
-Kafka supports this with an efficient batching format. A batch of messages 
can be clumped together compressed and sent to the server in this form. This 
batch of messages will be written in compressed form and will
-remain compressed in the log and will only be decompressed by the consumer.
+Kafka supports this with an efficient batching format. A batch of messages 
can be grouped together, compressed, and sent to the server in this form. The 
broker decompresses the batch in order to validate it. For
+example, it validates that the number of records in the batch is same as 
what batch header states. The broker may also potentially modify the batch 
(e.g., if the topic is compacted, the broker will filter out 

Review Comment:
   Thanks for checking in on this-- in light of the latest comment,  I removed 
the sentence "The broker may also potentially modify the batch (e.g., if the 
topic is compacted, the broker will filter out 
   records eligible for compaction prior to writing to disk)."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2023-09-07 Thread Benoit Delbosc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762808#comment-17762808
 ] 

Benoit Delbosc edited comment on KAFKA-15402 at 9/7/23 4:38 PM:


Thanks, I have tested {{max.incremental.fetch.session.cache.slots=0}} without 
success, same latency than the default 3.5.1.

The option was taken in account from the kafka log
{code}
INFO KafkaConfig values: 
...
max.incremental.fetch.session.cache.slots = 0
...
{code}


was (Author: bdelbosc):
Thanks, I have tested {{max.incremental.fetch.session.cache.slots=0}} without 
success, same latency than the default.

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14314:
URL: https://github.com/apache/kafka/pull/14314#discussion_r1318868378


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -5633,20 +5643,39 @@ public void testListOffsetsMetadataNonRetriableErrors() 
throws Exception {
 node0);
 
 final TopicPartition tp1 = new TopicPartition("foo", 0);
+final MetadataResponse preparedResponse = prepareMetadataResponse(
+cluster, topicMetadataError, partitionMetadataError
+);
 
 try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
 env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.TOPIC_AUTHORIZATION_FAILED));
+env.kafkaClient().prepareResponse(preparedResponse);
 
 Map partitions = new HashMap<>();
 partitions.put(tp1, OffsetSpec.latest());
 ListOffsetsResult result = 
env.adminClient().listOffsets(partitions);
 
-TestUtils.assertFutureError(result.all(), 
TopicAuthorizationException.class);
+TestUtils.assertFutureError(result.all(), expectedFailure);
 }
 }
 
+private static Stream listOffsetsMetadataNonRetriableErrors() {
+return Stream.of(
+Arguments.of(
+Errors.TOPIC_AUTHORIZATION_FAILED,
+Errors.TOPIC_AUTHORIZATION_FAILED,
+TopicAuthorizationException.class
+),
+Arguments.of(
+// We fail fast when the entire topic is unknown
+Errors.UNKNOWN_TOPIC_OR_PARTITION,
+Errors.NONE,

Review Comment:
   Sure, done. I've tried to clarify in the comments that these are unusual 
cases we're testing for lest anyone think that this is normal broker behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


yashmayya commented on PR #14279:
URL: https://github.com/apache/kafka/pull/14279#issuecomment-1710462603

   Thanks Chris!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2023-09-07 Thread Benoit Delbosc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762808#comment-17762808
 ] 

Benoit Delbosc commented on KAFKA-15402:


Thanks, I have tested {{max.incremental.fetch.session.cache.slots=0}} without 
success, same latency than the default.

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on pull request #14349: KAFKA-15352: Update log-start-offset before initiating deletion of remote segments

2023-09-07 Thread via GitHub


kamalcph commented on PR #14349:
URL: https://github.com/apache/kafka/pull/14349#issuecomment-1710450437

   > My point is, should we check for leadership even before updating the log 
start offset?
   
   yes, this is the expectation and being done inside the 
`handleLogStartOffsetUpdate` method:
   
   ```java
   public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long 
remoteLogStartOffset) {
   if (isLeader()) {
   logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
   updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
   }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante merged PR #14279:
URL: https://github.com/apache/kafka/pull/14279


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1318853979


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_PREFIX = "Message ";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+connect.alterSinkConnectorOffset(CONNECTOR_NAME, new 
TopicPartition(TOPIC, 0), (long) (NUM_MESSAGES - 1));
+
+connect.resumeConnector(CONNECTOR_NAME);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not resume in time");
+
+// The last message should be re-processed when the connector is 
resumed after the offsets are altered
+verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+}
+
+@Test
+public void testResetOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().res

[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849972


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_FORMAT = "Message %d";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+Map offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+List offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));

Review Comment:
   Looks good, thanks 👍



##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegra

[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849515


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_FORMAT = "Message %d";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+Map offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+List offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+connect.resumeConnector(CONNECTOR_NAME);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+  

[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2023-09-07 Thread Victor van den Hoven (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762676#comment-17762676
 ] 

Victor van den Hoven edited comment on KAFKA-15417 at 9/7/23 4:13 PM:
--

Yes, I think it was incorrect and flipped.

Sure, I can give it a try to work on a PR.

 

 


was (Author: victorvandenhoven):
Yes, I think it was incorrectly and flipped.

Sure, I can give it a try to work on a PR.

 

 

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Priority: Major
> Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found

2023-09-07 Thread via GitHub


jolshan commented on code in PR #14314:
URL: https://github.com/apache/kafka/pull/14314#discussion_r1318833581


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -5633,20 +5643,39 @@ public void testListOffsetsMetadataNonRetriableErrors() 
throws Exception {
 node0);
 
 final TopicPartition tp1 = new TopicPartition("foo", 0);
+final MetadataResponse preparedResponse = prepareMetadataResponse(
+cluster, topicMetadataError, partitionMetadataError
+);
 
 try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
 env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.TOPIC_AUTHORIZATION_FAILED));
+env.kafkaClient().prepareResponse(preparedResponse);
 
 Map partitions = new HashMap<>();
 partitions.put(tp1, OffsetSpec.latest());
 ListOffsetsResult result = 
env.adminClient().listOffsets(partitions);
 
-TestUtils.assertFutureError(result.all(), 
TopicAuthorizationException.class);
+TestUtils.assertFutureError(result.all(), expectedFailure);
 }
 }
 
+private static Stream listOffsetsMetadataNonRetriableErrors() {
+return Stream.of(
+Arguments.of(
+Errors.TOPIC_AUTHORIZATION_FAILED,
+Errors.TOPIC_AUTHORIZATION_FAILED,
+TopicAuthorizationException.class
+),
+Arguments.of(
+// We fail fast when the entire topic is unknown
+Errors.UNKNOWN_TOPIC_OR_PARTITION,
+Errors.NONE,

Review Comment:
   For my understanding, if the topic also had unknown topic or partition as 
the partition error message, we would fail fast there too.
   
   >  With this change, the operation always fails if an error is reported for 
the entire topic, even if an error is also reported for one or more individual 
topic partitions.
   
   Would we want to add this test case too just so the behavior is also shown 
in tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions

2023-09-07 Thread via GitHub


philipnee commented on code in PR #14323:
URL: https://github.com/apache/kafka/pull/14323#discussion_r1318822595


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Selection of a type of assignor used by a member to get partitions assigned 
as part of a
+ * consumer group. Selection could be one of:
+ * CLIENT assignors
+ * SERVER assignors
+ * 
+ * Client assignors include of a list of
+ * {@link 
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData.Assignor}
+ * Server assignors include a name of the server assignor selected, ex. 
uniform, range.
+ */
+public class AssignorSelection {
+public enum Type { CLIENT, SERVER }

Review Comment:
   why couldn't we do `hasClientAssignor()` ?
   
   Since we use Optional quite liberally - can we make the assignor Optional as 
well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >