[jira] [Updated] (KAFKA-15444) KIP-974: Docker Image for GraalVM based Native Kafka Broker
[ https://issues.apache.org/jira/browse/KAFKA-15444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishna Agarwal updated KAFKA-15444: Description: [KIP-974: Docker Image for GraalVM based Native Kafka Broker|https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker] > KIP-974: Docker Image for GraalVM based Native Kafka Broker > --- > > Key: KAFKA-15444 > URL: https://issues.apache.org/jira/browse/KAFKA-15444 > Project: Kafka > Issue Type: New Feature >Reporter: Krishna Agarwal >Assignee: Krishna Agarwal >Priority: Major > Labels: KIP-974 > > [KIP-974: Docker Image for GraalVM based Native Kafka > Broker|https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15445) KIP-975: Docker Image for Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishna Agarwal updated KAFKA-15445: Description: [KIP-975: Docker Image for Apache Kafka|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka] (was: [KIP-975|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka]) > KIP-975: Docker Image for Apache Kafka > -- > > Key: KAFKA-15445 > URL: https://issues.apache.org/jira/browse/KAFKA-15445 > Project: Kafka > Issue Type: New Feature >Reporter: Krishna Agarwal >Assignee: Krishna Agarwal >Priority: Major > Labels: KIP-975 > > [KIP-975: Docker Image for Apache > Kafka|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15445) KIP-975: Docker Image for Apache Kafka
[ https://issues.apache.org/jira/browse/KAFKA-15445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Krishna Agarwal updated KAFKA-15445: Description: [KIP-975|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka] > KIP-975: Docker Image for Apache Kafka > -- > > Key: KAFKA-15445 > URL: https://issues.apache.org/jira/browse/KAFKA-15445 > Project: Kafka > Issue Type: New Feature >Reporter: Krishna Agarwal >Assignee: Krishna Agarwal >Priority: Major > Labels: KIP-975 > > [KIP-975|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15445) KIP-975: Docker Image for Apache Kafka
Krishna Agarwal created KAFKA-15445: --- Summary: KIP-975: Docker Image for Apache Kafka Key: KAFKA-15445 URL: https://issues.apache.org/jira/browse/KAFKA-15445 Project: Kafka Issue Type: New Feature Reporter: Krishna Agarwal Assignee: Krishna Agarwal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15444) KIP-974: Docker Image for GraalVM based Native Kafka Broker
Krishna Agarwal created KAFKA-15444: --- Summary: KIP-974: Docker Image for GraalVM based Native Kafka Broker Key: KAFKA-15444 URL: https://issues.apache.org/jira/browse/KAFKA-15444 Project: Kafka Issue Type: New Feature Reporter: Krishna Agarwal Assignee: Krishna Agarwal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] github-actions[bot] commented on pull request #13836: KAFKA-14218: Replace temp file handler with JUnit 5 Temporary Directory Support
github-actions[bot] commented on PR #13836: URL: https://github.com/apache/kafka/pull/13836#issuecomment-1711026816 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14912) Introduce a configuration for remote index cache size, preferably a dynamic config.
[ https://issues.apache.org/jira/browse/KAFKA-14912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762931#comment-17762931 ] Luke Chen commented on KAFKA-14912: --- Agree to limit the cache by size, instead of the total entry number. > Introduce a configuration for remote index cache size, preferably a dynamic > config. > --- > > Key: KAFKA-14912 > URL: https://issues.apache.org/jira/browse/KAFKA-14912 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: hudeqi >Priority: Major > > Context: We need to make the 1024 value here [1] as dynamically configurable > [1] > https://github.com/apache/kafka/blob/8d24716f27b307da79a819487aefb8dec79b4ca8/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L119 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #14287: [Minor] Check the existence of AppInfo for the given ID before creating a new mbean of the same name
showuon commented on code in PR #14287: URL: https://github.com/apache/kafka/pull/14287#discussion_r1319285955 ## clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java: ## @@ -60,8 +60,13 @@ public static String getCommitId() { public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics, long nowMs) { try { ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id)); +MBeanServer server = ManagementFactory.getPlatformMBeanServer(); +if (server.isRegistered(name)) { +log.info("App info {} for {} already exists, so skipping a new mbean creation", prefix, id); Review Comment: nit: additional space after _skipping_. Question: What is this log output under consumer's case? I think it's: `App info kafka.consumer for transcript2_REALTIME-transcript-topic-0 already exists, so skipping a new mbean creation` Could we change to: `log.info("The mbean of App info: [{}], id: [{}] already exists, so skipping a new mbean creation.", prefix, id);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #14224: MINOR: fix currentLag javadoc
mjsax commented on PR #14224: URL: https://github.com/apache/kafka/pull/14224#issuecomment-1710988445 Thanks for the fix! Merged to `trunk` and cherry-picked to `3.6` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #14224: MINOR: fix currentLag javadoc
mjsax merged PR #14224: URL: https://github.com/apache/kafka/pull/14224 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #12988: KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams
mjsax commented on PR #12988: URL: https://github.com/apache/kafka/pull/12988#issuecomment-1710983828 Yes, that's expected and we should also cover this case. Because there are multiple consumer, we use `consumer.` to allow users to change a config for _all_ consumer. If you want to change a config for a specific consumer (or if you want to configure two consumer differently), you would use `main.consumer.` et al. If both prefix (generic and specific) are use the consumer-specific prefix has preference over the general `consumer.` prefix. Does this make sense? It's basically a "config hierarchy" (flexible and powerful, but maybe a little hard to understand on first encounter...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #14221: KAFKA-15338: The metric group documentation for metrics added in KAFK…
mjsax commented on PR #14221: URL: https://github.com/apache/kafka/pull/14221#issuecomment-1710978564 Thanks for the PR @atu-sharm. Merged to `trunk`, and cherry-picked to `3.6`, `3.5`, `3.4`, and `3.3` branches. Could you also do a PR against https://github.com/apache/kafka-site and pot this fix, so we get fix live on the web-page right away? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #14221: KAFKA-15338: The metric group documentation for metrics added in KAFK…
mjsax merged PR #14221: URL: https://github.com/apache/kafka/pull/14221 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14317: KAFKA-13973: Fix inflated block cache metrics
mjsax commented on code in PR #14317: URL: https://github.com/apache/kafka/pull/14317#discussion_r1319266372 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java: ## @@ -373,14 +373,14 @@ private Gauge gaugeToComputeBlockCacheMetrics(final String propertyN // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use // BigInteger and construct the object from the byte representation of the value result = new BigInteger(1, longToBytes( - valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) )); break; } else { // values of RocksDB properties are of type unsigned long in C++, i.e., in Java we need to use // BigInteger and construct the object from the byte representation of the value result = result.add(new BigInteger(1, longToBytes( - valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) + valueProvider.db.getLongProperty(ROCKSDB_PROPERTIES_PREFIX + propertyName) Review Comment: Btw: Maybe https://github.com/speedb-io/speedb/issues/583#issuecomment-1616710762 helps? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15443) Upgrade RocksDB dependency
Matthias J. Sax created KAFKA-15443: --- Summary: Upgrade RocksDB dependency Key: KAFKA-15443 URL: https://issues.apache.org/jira/browse/KAFKA-15443 Project: Kafka Issue Type: Task Components: streams Reporter: Matthias J. Sax Kafka Streams currently depends on RocksDB 7.9.2 However, the latest version of RocksDB is already 8.5.3. We should check the RocksDB release notes to see what benefits we get to upgrade to the latest version (and file corresponding tickets to exploit improvement of newer releases as applicable). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #14351: KAFKA-15441 Allow broker heartbeats to complete in metadata transaction
cmccabe commented on PR #14351: URL: https://github.com/apache/kafka/pull/14351#issuecomment-1710954958 Thinking about this more, If something runs in premigration it should complete in premigration. But maybe not complete inside a transaction more generally. Like I can imagine us doing something later where a heartbeat kicks off a big transaction containing a lot of records. We wouldn't want to complete inside that transaction. tl;dr We can probably just get rid of RUNS_IN_TRANSACTION and have it be implied by: * you have RUNS_IN_PREMIGRATION and * we're in premigration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762920#comment-17762920 ] Luke Chen commented on KAFKA-12473: --- Yes, we can wait for KIP-848. Thanks [~dajac]. > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762921#comment-17762921 ] Satish Duggana commented on KAFKA-9800: --- [~junrao] [~schofielaj] Are you planning to merge these changes to 3.6 branch? > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Andrew Schofield >Priority: Major > Labels: KIP-580, client > Fix For: 3.7.0 > > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for retry backoff ms. > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, I already wrapped the exponential backoff/timeout util class in > my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of attempts and returns the backoff/timeout value at > the corresponding level. Thus, we can add a new class property to those > classes containing retriable data in order to record the number of failed > attempts. > > Changes: > KafkaProducer: > # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each > ProducerBatch in Accumulator, which already has an attribute attempts > recording the number of failed attempts. So we can let the Accumulator > calculate the new retry backoff for each bach when it enqueues them, to avoid > instantiate the util class multiple times. > # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new > class property of type `Long` to record the number of attempts. > KafkaConsumer: > # Some synchronous retry use cases. Record the failed attempts in the > blocking loop. > # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). > Though the actual requests are packed for each node, the current > implementation is applying backoff to each topic partition, where the backoff > value is kept by TopicPartitionState. Thus, TopicPartitionState will have the > new property recording the number of attempts. > Metadata: > # Metadata lives as a singleton in many clients. Add a new property > recording the number of attempts > AdminClient: > # AdminClient has its own request abstraction Call. The failed attempts are > already kept by the abstraction. So probably clean the Call class logic a bit. > Existing tests: > # If the tests are testing the retry backoff, add a delta to the assertion, > considering the existence of the jitter. > # If the tests are testing other functionality, we can specify the same > value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make > the retry backoff static. We can use this trick to make the existing tests > compatible with the changes. > There're other common usages look like client.poll(timeout), where the > timeout passed in is the retry backoff value. We won't change these usages > since its underlying logic is nioSelector.select(timeout) and > nioSelector.selectNow(), which means if no interested op exists, the client > will block retry backoff milliseconds. This is an optimization when there's > no request that needs to be sent but the client is waiting for responses. > Specifically, if the client fails the inflight requests before the retry > backoff milliseconds passed, it still needs to wait until that amount of time > passed, unless there's a new request need to be sent. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-12473: - Assignee: (was: Luke Chen) > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
satishd commented on PR #14111: URL: https://github.com/apache/kafka/pull/14111#issuecomment-1710948188 @junrao @AndrewJSchofield Are you planning to merge these changes to 3.6 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #14157: KAFKA-15303: Avoid unnecessary re-serialization in FK-join
mjsax commented on PR #14157: URL: https://github.com/apache/kafka/pull/14157#issuecomment-1710947305 \cc @cadonna @ableegoldman @lucasbru -- anyone has some spare cycles to review so we can make progress? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case
philipnee commented on PR #14313: URL: https://github.com/apache/kafka/pull/14313#issuecomment-1710936514 @C0urante - Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue opened a new pull request, #14357: KAFKA-15276: Implement partition assignment reconciliation
kirktrue opened a new pull request, #14357: URL: https://github.com/apache/kafka/pull/14357 WIP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #14356: MINOR: rename BrokerToControllerChannelManager to NodeToControllerChannelManager
cmccabe opened a new pull request, #14356: URL: https://github.com/apache/kafka/pull/14356 No code changes, just renames. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14273) Kafka doesn't start with KRaft on Windows
[ https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762907#comment-17762907 ] Satish Duggana commented on KAFKA-14273: https://github.com/apache/kafka/pull/14354 > Kafka doesn't start with KRaft on Windows > - > > Key: KAFKA-14273 > URL: https://issues.apache.org/jira/browse/KAFKA-14273 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Kedar Joshi >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > {{Basic setup doesn't work on Windows 10.}} > *{{Steps}}* > * {{Initialize cluster with -}} > {code:sh} > bin\windows\kafka-storage.bat random-uuid > bin\windows\kafka-storage.bat format -t %cluster_id% -c > .\config\kraft\server.properties{code} > > * Start Kafka with - > {code:sh} > bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code} > > *Stacktrace* > Kafka fails to start with following exception - > {code:java} > D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat > .\config\kraft\server.properties > [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController > MBean (kafka.utils.Log4jControllerRegistration$) > [2022-10-03 23:14:20,375] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated > TLS renegotiation (org.apache.zookeeper.common.X509Util) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading > producer state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state > recovery took 0ms for snapshot load and 0ms for segment recovery from offset > 0 (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() > from > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 > (kafka.raft.KafkaMetadataLog$) > [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting > (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) > [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.UncheckedIOException: Error while writing the Quorum status from the > file > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128) > at > org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477) > at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212) > at > org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369) > at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp > -> > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) > ... 10 more > Suppressed: java.nio.file.FileSystemException: >
[jira] [Commented] (KAFKA-14273) Kafka doesn't start with KRaft on Windows
[ https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762908#comment-17762908 ] Satish Duggana commented on KAFKA-14273: Thanks [~jsancio] for the quick fix. > Kafka doesn't start with KRaft on Windows > - > > Key: KAFKA-14273 > URL: https://issues.apache.org/jira/browse/KAFKA-14273 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Kedar Joshi >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > {{Basic setup doesn't work on Windows 10.}} > *{{Steps}}* > * {{Initialize cluster with -}} > {code:sh} > bin\windows\kafka-storage.bat random-uuid > bin\windows\kafka-storage.bat format -t %cluster_id% -c > .\config\kraft\server.properties{code} > > * Start Kafka with - > {code:sh} > bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code} > > *Stacktrace* > Kafka fails to start with following exception - > {code:java} > D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat > .\config\kraft\server.properties > [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController > MBean (kafka.utils.Log4jControllerRegistration$) > [2022-10-03 23:14:20,375] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated > TLS renegotiation (org.apache.zookeeper.common.X509Util) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading > producer state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state > recovery took 0ms for snapshot load and 0ms for segment recovery from offset > 0 (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() > from > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 > (kafka.raft.KafkaMetadataLog$) > [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting > (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) > [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.UncheckedIOException: Error while writing the Quorum status from the > file > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128) > at > org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477) > at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212) > at > org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369) > at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp > -> > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) > ... 10 more > Suppressed: java.nio.file.FileSystemException: > D:\Loca
[jira] [Updated] (KAFKA-15416) Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound
[ https://issues.apache.org/jira/browse/KAFKA-15416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15416: -- Fix Version/s: 3.6.0 > Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound > -- > > Key: KAFKA-15416 > URL: https://issues.apache.org/jira/browse/KAFKA-15416 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > Fix For: 3.6.0, 3.7.0 > > > This test fails frequently when I run unit tests locally, but I've never seen > it fail during a CI build. > Failure message: > {quote} org.apache.kafka.connect.errors.ConnectException: Failed to list > offsets for topic partitions. > at > app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:777) > at > app//org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570) > > Caused by: > org.apache.kafka.connect.errors.ConnectException: Fail to list > offsets for topic partitions after 1 attempts. Reason: Timed out while > waiting to get end offsets for topic 'myTopic' on brokers at > \{retry.backoff.ms=0} > at > app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:106) > at > app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56) > at > app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768) > ... 1 more > > Caused by: > org.apache.kafka.common.errors.TimeoutException: Timed out while > waiting to get end offsets for topic 'myTopic' on brokers at > \{retry.backoff.ms=0} > > Caused by: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send > the call. Call: listOffsets(api=METADATA) > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:716) > at > org.apache.kafka.connect.util.TopicAdmin.lambda$retryEndOffsets$7(TopicAdmin.java:769) > at > org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:87) > at > org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56) > at > org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768) > at > org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570) > > Caused by: > org.apache.kafka.common.errors.TimeoutException: Timed > out waiting to send the call. Call: listOffsets(api=METADATA) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14273) Kafka doesn't start with KRaft on Windows
[ https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-14273. Resolution: Fixed > Kafka doesn't start with KRaft on Windows > - > > Key: KAFKA-14273 > URL: https://issues.apache.org/jira/browse/KAFKA-14273 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Kedar Joshi >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > {{Basic setup doesn't work on Windows 10.}} > *{{Steps}}* > * {{Initialize cluster with -}} > {code:sh} > bin\windows\kafka-storage.bat random-uuid > bin\windows\kafka-storage.bat format -t %cluster_id% -c > .\config\kraft\server.properties{code} > > * Start Kafka with - > {code:sh} > bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code} > > *Stacktrace* > Kafka fails to start with following exception - > {code:java} > D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat > .\config\kraft\server.properties > [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController > MBean (kafka.utils.Log4jControllerRegistration$) > [2022-10-03 23:14:20,375] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated > TLS renegotiation (org.apache.zookeeper.common.X509Util) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading > producer state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state > recovery took 0ms for snapshot load and 0ms for segment recovery from offset > 0 (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() > from > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 > (kafka.raft.KafkaMetadataLog$) > [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting > (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) > [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.UncheckedIOException: Error while writing the Quorum status from the > file > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128) > at > org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477) > at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212) > at > org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369) > at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp > -> > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) > ... 10 more > Suppressed: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-l
[jira] [Resolved] (KAFKA-15416) Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound
[ https://issues.apache.org/jira/browse/KAFKA-15416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-15416. --- Fix Version/s: 3.7.0 Resolution: Fixed > Flaky test TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound > -- > > Key: KAFKA-15416 > URL: https://issues.apache.org/jira/browse/KAFKA-15416 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > Fix For: 3.7.0 > > > This test fails frequently when I run unit tests locally, but I've never seen > it fail during a CI build. > Failure message: > {quote} org.apache.kafka.connect.errors.ConnectException: Failed to list > offsets for topic partitions. > at > app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:777) > at > app//org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570) > > Caused by: > org.apache.kafka.connect.errors.ConnectException: Fail to list > offsets for topic partitions after 1 attempts. Reason: Timed out while > waiting to get end offsets for topic 'myTopic' on brokers at > \{retry.backoff.ms=0} > at > app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:106) > at > app//org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56) > at > app//org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768) > ... 1 more > > Caused by: > org.apache.kafka.common.errors.TimeoutException: Timed out while > waiting to get end offsets for topic 'myTopic' on brokers at > \{retry.backoff.ms=0} > > Caused by: > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send > the call. Call: listOffsets(api=METADATA) > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:716) > at > org.apache.kafka.connect.util.TopicAdmin.lambda$retryEndOffsets$7(TopicAdmin.java:769) > at > org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:87) > at > org.apache.kafka.connect.util.RetryUtil.retryUntilTimeout(RetryUtil.java:56) > at > org.apache.kafka.connect.util.TopicAdmin.retryEndOffsets(TopicAdmin.java:768) > at > org.apache.kafka.connect.util.TopicAdminTest.retryEndOffsetsShouldRetryWhenTopicNotFound(TopicAdminTest.java:570) > > Caused by: > org.apache.kafka.common.errors.TimeoutException: Timed > out waiting to send the call. Call: listOffsets(api=METADATA) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case
C0urante merged PR #14313: URL: https://github.com/apache/kafka/pull/14313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case
C0urante commented on PR #14313: URL: https://github.com/apache/kafka/pull/14313#issuecomment-1710880812 Ah, that makes more sense. Yeah, I've seen some of those too, would be nice to patch them. Thanks for your help on this one @philipnee! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio closed pull request #12763: KAFKA-14273: Kafka doesn't start with KRaft on Windows
jsancio closed pull request #12763: KAFKA-14273: Kafka doesn't start with KRaft on Windows URL: https://github.com/apache/kafka/pull/12763 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #14354: KAFKA-14273; Close file before atomic move
jsancio merged PR #14354: URL: https://github.com/apache/kafka/pull/14354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #14354: KAFKA-14273; Close file before atomic move
jsancio commented on code in PR #14354: URL: https://github.com/apache/kafka/pull/14354#discussion_r1319056404 ## raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java: ## @@ -144,21 +144,29 @@ private void writeElectionStateToFile(final File stateFile, QuorumStateData stat log.trace("Writing tmp quorum state {}", temp.getAbsolutePath()); -try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); - final BufferedWriter writer = new BufferedWriter( - new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { -short version = state.highestSupportedVersion(); - -ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, version); -jsonState.set(DATA_VERSION, new ShortNode(version)); -writer.write(jsonState.toString()); -writer.flush(); -fileOutputStream.getFD().sync(); +try { +try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); + final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8) + ) +) { +short version = state.highestSupportedVersion(); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #14305: KAFKA-14274: [1/7] basic refactoring
junrao merged PR #14305: URL: https://github.com/apache/kafka/pull/14305 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
cmccabe merged PR #14306: URL: https://github.com/apache/kafka/pull/14306 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
cmccabe commented on code in PR #14306: URL: https://github.com/apache/kafka/pull/14306#discussion_r1319165974 ## clients/src/main/resources/common/message/ControllerRegistrationRequest.json: ## @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 70, Review Comment: edit: now using 70 again since someone claimed 69 :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
C0urante commented on PR #14314: URL: https://github.com/apache/kafka/pull/14314#issuecomment-1710828993 Sorry! Yes, that's correct--3.6 is all we need to backport to. Thanks @jolshan! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14305: KAFKA-14274: [1/7] basic refactoring
kirktrue commented on code in PR #14305: URL: https://github.com/apache/kafka/pull/14305#discussion_r1319162018 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -639,9 +639,9 @@ private void maybeOverrideClientId(Map configs) { } } -protected static Map appendDeserializerToConfig(Map configs, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { +public static Map appendDeserializerToConfig(Map configs, Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #14305: KAFKA-14274: [1/7] basic refactoring
kirktrue commented on PR #14305: URL: https://github.com/apache/kafka/pull/14305#issuecomment-1710825657 Test failures in latest build are unrelated: ``` integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoGroupAcl() kafka.api.TransactionsTest.testBumpTransactionalEpoch() kafka.api.TransactionsTest.testCommitTransactionTimeout() kafka.controller.ControllerIntegrationTest.testTopicIdPersistsThroughControllerRestart() kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestExcludingClusterAuthorizedOperations() kafka.server.ProduceRequestTest.testSimpleProduceRequest() o.a.k.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupSentExternalCompletion() o.a.k.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() o.a.k.controller.QuorumControllerTest.testBalancePartitionLeaders() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan merged pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan merged PR #14314: URL: https://github.com/apache/kafka/pull/14314 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on PR #14314: URL: https://github.com/apache/kafka/pull/14314#issuecomment-1710766397 Tests look unrelated and like flakes I've seen before, so I will go ahead and merge + pick to 3.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on pull request #14345: MINOR: Range assignor changes
rreddy-22 commented on PR #14345: URL: https://github.com/apache/kafka/pull/14345#issuecomment-1710765019 > hey! @vamossagar12 , the other assignors are still under development, this is just an open PR in case we find other changes we would like to make so that all three have the same format and naming conventions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm commented on PR #14346: URL: https://github.com/apache/kafka/pull/14346#issuecomment-1710762508 Thanks a lot @philipnee for the review, all comments addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1319111762 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -209,6 +215,34 @@ public ConsumerRecords poll(final Duration timeout) { return ConsumerRecords.empty(); } +/** + * Set the fetch position to the committed position (if there is one) or reset it using the + * offset reset policy the user has configured. + * + * @return true if the operation completed without timing out + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + *defined + */ +private boolean updateFetchPositions() { +// If any partitions have been truncated due to a leader change, we need to validate the offsets +ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); +eventHandler.add(validatePositionsEvent); + +// TODO: integrate logic for refreshing committed offsets if available Review Comment: For the record, even though the change for using committed offsets here is 2 lines of code replacing this TODO, it requires also a minor refactoring on existing core components (ex. `ConsumerCoordinator`). So it will all follow this, on a PR like "Adding support for using committed offsets when updating fetch positions" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
jolshan commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319107403 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1), -new TopicPartition(topics(2), 0), -new TopicPartition(topics(2), 1), -new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = -new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { -properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) -properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") -properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") -properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") -properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { -doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "gro
[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1319093808 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -298,6 +356,154 @@ private CompletableFuture buildListOffsetRequestToNode( return result; } +/** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ +private List buildListOffsetsRequestsAndResetPositions( +final Map timestampsToSearch) { +Map> timestampsToSearchByNode = +groupListOffsetRequests(timestampsToSearch, Optional.empty()); + +final List unsentRequests = new ArrayList<>(); + +timestampsToSearchByNode.forEach((node, resetTimestamps) -> { +subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), +time.milliseconds() + requestTimeoutMs); + +CompletableFuture partialResult = buildListOffsetRequestToNode( +node, +resetTimestamps, +false, +unsentRequests); + +partialResult.whenComplete((result, error) -> { +if (error == null) { + offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, +result); +} else { +RuntimeException e; +if (error instanceof RuntimeException) { +e = (RuntimeException) error; +} else { +e = new RuntimeException("Unexpected failure in ListOffsets request for " + +"resetting positions", error); +} + offsetFetcherUtils.onFailedRequestForResettingPositions(resetTimestamps, e); +} +}); +}); +return unsentRequests; +} + +/** + * For each partition that needs validation, make an asynchronous request to get the end-offsets + * for the partition with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private List buildListOffsetsRequestsAndValidatePositions( +Map partitionsToValidate) { + +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; +final List unsentRequests = new ArrayList<>(); +regrouped.forEach((node, fetchPositions) -> { + +if (node.isEmpty()) { Review Comment: Yes, it's needed, these are 2 different things. The `if` is checking the Node object `isEmpty` function, but the filter is only removing the entries for which the leader (`Optional`) is not present. So we could have an Optional present, but returning a Node for which `isEmpty` is true (used mainly in case of errors) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
rreddy-22 commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319088896 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1), -new TopicPartition(topics(2), 0), -new TopicPartition(topics(2), 1), -new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = -new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { -properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) -properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") -properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") -properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") -properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { -doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "g
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
rreddy-22 commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319088896 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1), -new TopicPartition(topics(2), 0), -new TopicPartition(topics(2), 1), -new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = -new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { -properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) -properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") -properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") -properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") -properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { -doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "g
[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
jolshan commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319083998 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1), -new TopicPartition(topics(2), 0), -new TopicPartition(topics(2), 1), -new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = -new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { -properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) -properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") -properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") -properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") -properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { -doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "gro
[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
jolshan commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319082640 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1), -new TopicPartition(topics(2), 0), -new TopicPartition(topics(2), 1), -new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = -new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { -properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) -properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") -properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") -properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") -properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { -doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "gro
[jira] [Updated] (KAFKA-14273) Kafka doesn't start with KRaft on Windows
[ https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-14273: --- Fix Version/s: 3.6.0 > Kafka doesn't start with KRaft on Windows > - > > Key: KAFKA-14273 > URL: https://issues.apache.org/jira/browse/KAFKA-14273 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Kedar Joshi >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.6.0 > > > {{Basic setup doesn't work on Windows 10.}} > *{{Steps}}* > * {{Initialize cluster with -}} > {code:sh} > bin\windows\kafka-storage.bat random-uuid > bin\windows\kafka-storage.bat format -t %cluster_id% -c > .\config\kraft\server.properties{code} > > * Start Kafka with - > {code:sh} > bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code} > > *Stacktrace* > Kafka fails to start with following exception - > {code:java} > D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat > .\config\kraft\server.properties > [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController > MBean (kafka.utils.Log4jControllerRegistration$) > [2022-10-03 23:14:20,375] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated > TLS renegotiation (org.apache.zookeeper.common.X509Util) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading > producer state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state > recovery took 0ms for snapshot load and 0ms for segment recovery from offset > 0 (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() > from > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 > (kafka.raft.KafkaMetadataLog$) > [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting > (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) > [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.UncheckedIOException: Error while writing the Quorum status from the > file > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128) > at > org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477) > at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212) > at > org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369) > at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp > -> > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) > ... 10 more > Suppressed: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-
[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1319081696 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -209,6 +215,34 @@ public ConsumerRecords poll(final Duration timeout) { return ConsumerRecords.empty(); } +/** + * Set the fetch position to the committed position (if there is one) or reset it using the + * offset reset policy the user has configured. + * + * @return true if the operation completed without timing out + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + *defined + */ +private boolean updateFetchPositions() { +// If any partitions have been truncated due to a leader change, we need to validate the offsets +ValidatePositionsApplicationEvent validatePositionsEvent = new ValidatePositionsApplicationEvent(); +eventHandler.add(validatePositionsEvent); + +// TODO: integrate logic for refreshing committed offsets if available Review Comment: That's already implemented (in the integration branch), just included in a different PR that should come after this, following the merge plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
rreddy-22 commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319078806 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -14,233 +14,532 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.server +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.OffsetFetchResponseData import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData -import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} +import org.apache.kafka.common.requests.{OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith -import java.util -import java.util.Collections.singletonList +import java.util.Comparator +import java.util.stream.Collectors import scala.jdk.CollectionConverters._ -import java.util.{Collections, Optional, Properties} - -class OffsetFetchRequestTest extends BaseRequestTest { - - override def brokerCount: Int = 1 - - val brokerId: Integer = 0 - val offset = 15L - val leaderEpoch: Optional[Integer] = Optional.of(3) - val metadata = "metadata" - val topic = "topic" - val groupId = "groupId" - val groups: Seq[String] = (1 to 5).map(i => s"group$i") - val topics: Seq[String] = (1 to 3).map(i => s"topic$i") - val topic1List = singletonList(new TopicPartition(topics(0), 0)) - val topic1And2List = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1)) - val allTopicsList = util.Arrays.asList( -new TopicPartition(topics(0), 0), -new TopicPartition(topics(1), 0), -new TopicPartition(topics(1), 1), -new TopicPartition(topics(2), 0), -new TopicPartition(topics(2), 1), -new TopicPartition(topics(2), 2)) - val groupToPartitionMap: util.Map[String, util.List[TopicPartition]] = -new util.HashMap[String, util.List[TopicPartition]]() - groupToPartitionMap.put(groups(0), topic1List) - groupToPartitionMap.put(groups(1), topic1And2List) - groupToPartitionMap.put(groups(2), allTopicsList) - groupToPartitionMap.put(groups(3), null) - groupToPartitionMap.put(groups(4), null) - - override def brokerPropertyOverrides(properties: Properties): Unit = { -properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) -properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") -properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") -properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") -properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") -properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testSingleGroupOffsetFetchWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testSingleGroupOffsetFetch(useNewProtocol = true, requireStable = true) } - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { -doSetup(testInfo, createOffsetsTopic = false) + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "g
[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
jolshan commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319078377 ## core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala: ## @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance +import kafka.utils.TestUtils +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "60"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetCommitWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testOffsetCommit(true) + } + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetCommitWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testOffsetCommit(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetCommitWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testOffsetCommit(false) + } + + private def testOffsetCommit(useNewProtocol: Boolean): Unit = { +if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") +} + +val admin = cluster.createAdminClient() + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +TestUtils.createOffsetsTopicWithAdmin( + admin = admin, + brokers = if (cluster.isKRaftTest) { + cluster.asInstanceOf[RaftClusterInstance].brokers.collect(Collectors.toList[KafkaBroker]).asScala + } else { + cluster.asInstanceOf[ZkClusterInstance].servers.collect(Collectors.toList[KafkaBroker]).asScala + } +) + +// Create the topic. +TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 +) + +// Join the consumer group. +val (memberId, memberEpoch) = if (useNewProtocol) { + // Note that we heartbeat only once to join the group and assume + // that the test will complete within the session timeout. + joinConsumerGroupWithNewProtocol("grp") +} else { + // Note that we don't heartbeat and assume that the tes
[GitHub] [kafka] lianetm commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1319078266 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -268,7 +326,7 @@ private CompletableFuture buildListOffsetRequestToNode( .forConsumer(requireTimestamps, isolationLevel, false) .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes)); -log.debug("Creating ListOffsetRequest {} for broker {} to reset positions", builder, Review Comment: I'm trying to use the proper API key for that request, as defined in the Kafka protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions
lianetm commented on code in PR #14323: URL: https://github.com/apache/kafka/pull/14323#discussion_r1319075024 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Membership manager that maintains group membership for a single member following the new + * consumer group protocol. + * + * This keeps membership state and assignment updated in-memory, based on the heartbeat responses + * the member receives. It is also responsible for computing assignment for the group based on + * the metadata, if the member has been selected by the broker to do so. + */ +public class MembershipManagerImpl implements MembershipManager { + +private final String groupId; +private Optional groupInstanceId; +private String memberId; +private int memberEpoch; +private MemberState state; +private AssignorSelection assignorSelection; + +/** + * Assignment that the member received from the server and successfully processed + */ +private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + +/** + * List of assignments that the member received from the server but hasn't processed yet + */ +private final List targetAssignments; + +public MembershipManagerImpl(String groupId) { +this.groupId = groupId; +this.state = MemberState.UNJOINED; +this.assignorSelection = AssignorSelection.defaultAssignor(); +this.targetAssignments = new ArrayList<>(); +} + +public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) { +this(groupId); +this.groupInstanceId = Optional.ofNullable(groupInstanceId); +setAssignorSelection(assignorSelection); +} + +/** + * Update assignor selection for the member. + * + * @param assignorSelection New assignor selection + * @throws IllegalArgumentException If the provided assignor selection is null + */ +public void setAssignorSelection(AssignorSelection assignorSelection) { +if (assignorSelection == null) { +throw new IllegalArgumentException("Assignor selection cannot be null"); +} +this.assignorSelection = assignorSelection; +} + +private void transitionTo(MemberState nextState) { +if (!nextState.getPreviousValidStates().contains(state)) { +// TODO: handle invalid state transition +throw new RuntimeException(String.format("Invalid state transition from %s to %s", +state, nextState)); +} +this.state = nextState; +} + +@Override +public String groupId() { +return groupId; +} + +@Override +public String groupInstanceId() { +// TODO: review empty vs null instance id +return groupInstanceId.orElse(null); +} + +@Override +public String memberId() { +return memberId; +} + +@Override +public int memberEpoch() { +return memberEpoch; +} + +@Override +public void updateState(ConsumerGroupHeartbeatResponseData response) { +if (response.errorCode() == Errors.NONE.code()) { +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +targetAssignments.add(response.assignment()); +transitionTo(MemberState.PROCESSING_ASSIGNMENT); +} else { +if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { +resetMemberIdAndEpoch(); +transitionTo(MemberState.UNJOINED); +} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { +transitionTo(MemberState.FAILED); +} +} +} + +public void onAs
[GitHub] [kafka] lianetm commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions
lianetm commented on code in PR #14323: URL: https://github.com/apache/kafka/pull/14323#discussion_r1319075024 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Membership manager that maintains group membership for a single member following the new + * consumer group protocol. + * + * This keeps membership state and assignment updated in-memory, based on the heartbeat responses + * the member receives. It is also responsible for computing assignment for the group based on + * the metadata, if the member has been selected by the broker to do so. + */ +public class MembershipManagerImpl implements MembershipManager { + +private final String groupId; +private Optional groupInstanceId; +private String memberId; +private int memberEpoch; +private MemberState state; +private AssignorSelection assignorSelection; + +/** + * Assignment that the member received from the server and successfully processed + */ +private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + +/** + * List of assignments that the member received from the server but hasn't processed yet + */ +private final List targetAssignments; + +public MembershipManagerImpl(String groupId) { +this.groupId = groupId; +this.state = MemberState.UNJOINED; +this.assignorSelection = AssignorSelection.defaultAssignor(); +this.targetAssignments = new ArrayList<>(); +} + +public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) { +this(groupId); +this.groupInstanceId = Optional.ofNullable(groupInstanceId); +setAssignorSelection(assignorSelection); +} + +/** + * Update assignor selection for the member. + * + * @param assignorSelection New assignor selection + * @throws IllegalArgumentException If the provided assignor selection is null + */ +public void setAssignorSelection(AssignorSelection assignorSelection) { +if (assignorSelection == null) { +throw new IllegalArgumentException("Assignor selection cannot be null"); +} +this.assignorSelection = assignorSelection; +} + +private void transitionTo(MemberState nextState) { +if (!nextState.getPreviousValidStates().contains(state)) { +// TODO: handle invalid state transition +throw new RuntimeException(String.format("Invalid state transition from %s to %s", +state, nextState)); +} +this.state = nextState; +} + +@Override +public String groupId() { +return groupId; +} + +@Override +public String groupInstanceId() { +// TODO: review empty vs null instance id +return groupInstanceId.orElse(null); +} + +@Override +public String memberId() { +return memberId; +} + +@Override +public int memberEpoch() { +return memberEpoch; +} + +@Override +public void updateState(ConsumerGroupHeartbeatResponseData response) { +if (response.errorCode() == Errors.NONE.code()) { +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +targetAssignments.add(response.assignment()); +transitionTo(MemberState.PROCESSING_ASSIGNMENT); +} else { +if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { +resetMemberIdAndEpoch(); +transitionTo(MemberState.UNJOINED); +} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { +transitionTo(MemberState.FAILED); +} +} +} + +public void onAs
[jira] [Assigned] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-15417: --- Assignee: Victor van den Hoven > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Assignee: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762863#comment-17762863 ] Matthias J. Sax commented on KAFKA-15417: - Thanks a lot! Assigned the ticket to you. > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Assignee: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax merged pull request #14341: KAFKA-15307: Removes non-existent configs
mjsax merged PR #14341: URL: https://github.com/apache/kafka/pull/14341 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request, #14355: KAFKA-14595 ReassignPartitionsUnitTest rewritten in java
nizhikov opened a new pull request, #14355: URL: https://github.com/apache/kafka/pull/14355 This PR is part of #13247 It contains changes to rewrite single test in java. Intention is reduce changes in parent PR. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #14354: KAFKA-14273; Close file before atomic move
cmccabe commented on code in PR #14354: URL: https://github.com/apache/kafka/pull/14354#discussion_r1319034514 ## raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java: ## @@ -144,21 +144,29 @@ private void writeElectionStateToFile(final File stateFile, QuorumStateData stat log.trace("Writing tmp quorum state {}", temp.getAbsolutePath()); -try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); - final BufferedWriter writer = new BufferedWriter( - new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { -short version = state.highestSupportedVersion(); - -ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, version); -jsonState.set(DATA_VERSION, new ShortNode(version)); -writer.write(jsonState.toString()); -writer.flush(); -fileOutputStream.getFD().sync(); +try { +try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); + final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8) + ) +) { +short version = state.highestSupportedVersion(); Review Comment: while we're fixing this let's get rid of `highestSupportedVersion` here. Be clear about the version we're setting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions
philipnee commented on code in PR #14323: URL: https://github.com/apache/kafka/pull/14323#discussion_r1319012270 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Membership manager that maintains group membership for a single member following the new + * consumer group protocol. + * + * This keeps membership state and assignment updated in-memory, based on the heartbeat responses + * the member receives. It is also responsible for computing assignment for the group based on + * the metadata, if the member has been selected by the broker to do so. + */ +public class MembershipManagerImpl implements MembershipManager { + +private final String groupId; +private Optional groupInstanceId; +private String memberId; +private int memberEpoch; +private MemberState state; +private AssignorSelection assignorSelection; + +/** + * Assignment that the member received from the server and successfully processed + */ +private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + +/** + * List of assignments that the member received from the server but hasn't processed yet + */ +private final List targetAssignments; + +public MembershipManagerImpl(String groupId) { +this.groupId = groupId; +this.state = MemberState.UNJOINED; +this.assignorSelection = AssignorSelection.defaultAssignor(); +this.targetAssignments = new ArrayList<>(); +} + +public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) { +this(groupId); +this.groupInstanceId = Optional.ofNullable(groupInstanceId); +setAssignorSelection(assignorSelection); +} + +/** + * Update assignor selection for the member. + * + * @param assignorSelection New assignor selection + * @throws IllegalArgumentException If the provided assignor selection is null + */ +public void setAssignorSelection(AssignorSelection assignorSelection) { +if (assignorSelection == null) { +throw new IllegalArgumentException("Assignor selection cannot be null"); +} +this.assignorSelection = assignorSelection; +} + +private void transitionTo(MemberState nextState) { +if (!nextState.getPreviousValidStates().contains(state)) { +// TODO: handle invalid state transition +throw new RuntimeException(String.format("Invalid state transition from %s to %s", +state, nextState)); +} +this.state = nextState; +} + +@Override +public String groupId() { +return groupId; +} + +@Override +public String groupInstanceId() { +// TODO: review empty vs null instance id +return groupInstanceId.orElse(null); +} + +@Override +public String memberId() { +return memberId; +} + +@Override +public int memberEpoch() { +return memberEpoch; +} + +@Override +public void updateState(ConsumerGroupHeartbeatResponseData response) { +if (response.errorCode() == Errors.NONE.code()) { +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +targetAssignments.add(response.assignment()); +transitionTo(MemberState.PROCESSING_ASSIGNMENT); +} else { +if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { +resetMemberIdAndEpoch(); +transitionTo(MemberState.UNJOINED); +} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { +transitionTo(MemberState.FAILED); +} +} +} + +public void on
[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
jolshan commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319008280 ## core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala: ## @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance +import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance +import kafka.utils.TestUtils +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.stream.Collectors +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "60"), Review Comment: Is this the current way we enable setting new consumer group protocol? Setting these two configs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14353: KAFKA-14499: [7/7] Add integration tests for OffsetCommit API and OffsetFetch API
jolshan commented on code in PR #14353: URL: https://github.com/apache/kafka/pull/14353#discussion_r1319005551 ## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ## @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.utils.{NotNothing, TestUtils} +import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, JoinGroupRequest, JoinGroupResponse, OffsetCommitRequest, OffsetCommitResponse, SyncGroupRequest, SyncGroupResponse} +import org.junit.jupiter.api.Assertions.assertEquals + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag + +class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { Review Comment: Is the goal here to create a new baseRequestTest class? I noticed we didn't extend the existing BaseRequestTest class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions
lianetm commented on code in PR #14323: URL: https://github.com/apache/kafka/pull/14323#discussion_r1319005335 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Membership manager that maintains group membership for a single member following the new + * consumer group protocol. + * + * This keeps membership state and assignment updated in-memory, based on the heartbeat responses + * the member receives. It is also responsible for computing assignment for the group based on + * the metadata, if the member has been selected by the broker to do so. + */ +public class MembershipManagerImpl implements MembershipManager { + +private final String groupId; +private Optional groupInstanceId; +private String memberId; +private int memberEpoch; +private MemberState state; +private AssignorSelection assignorSelection; + +/** + * Assignment that the member received from the server and successfully processed + */ +private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + +/** + * List of assignments that the member received from the server but hasn't processed yet + */ +private final List targetAssignments; + +public MembershipManagerImpl(String groupId) { +this.groupId = groupId; +this.state = MemberState.UNJOINED; +this.assignorSelection = AssignorSelection.defaultAssignor(); +this.targetAssignments = new ArrayList<>(); +} + +public MembershipManagerImpl(String groupId, String groupInstanceId, AssignorSelection assignorSelection) { +this(groupId); +this.groupInstanceId = Optional.ofNullable(groupInstanceId); +setAssignorSelection(assignorSelection); +} + +/** + * Update assignor selection for the member. + * + * @param assignorSelection New assignor selection + * @throws IllegalArgumentException If the provided assignor selection is null + */ +public void setAssignorSelection(AssignorSelection assignorSelection) { +if (assignorSelection == null) { +throw new IllegalArgumentException("Assignor selection cannot be null"); +} +this.assignorSelection = assignorSelection; +} + +private void transitionTo(MemberState nextState) { +if (!nextState.getPreviousValidStates().contains(state)) { +// TODO: handle invalid state transition +throw new RuntimeException(String.format("Invalid state transition from %s to %s", +state, nextState)); +} +this.state = nextState; +} + +@Override +public String groupId() { +return groupId; +} + +@Override +public String groupInstanceId() { +// TODO: review empty vs null instance id +return groupInstanceId.orElse(null); +} + +@Override +public String memberId() { +return memberId; +} + +@Override +public int memberEpoch() { +return memberEpoch; +} + +@Override +public void updateState(ConsumerGroupHeartbeatResponseData response) { +if (response.errorCode() == Errors.NONE.code()) { +this.memberId = response.memberId(); +this.memberEpoch = response.memberEpoch(); +targetAssignments.add(response.assignment()); +transitionTo(MemberState.PROCESSING_ASSIGNMENT); +} else { +if (response.errorCode() == Errors.FENCED_MEMBER_EPOCH.code() || response.errorCode() == Errors.UNKNOWN_MEMBER_ID.code()) { +resetMemberIdAndEpoch(); +transitionTo(MemberState.UNJOINED); +} else if (response.errorCode() == Errors.UNRELEASED_INSTANCE_ID.code()) { +transitionTo(MemberState.FAILED); +} +} +} + +public void onAs
[GitHub] [kafka] jsancio commented on a diff in pull request #12763: KAFKA-14273: Kafka doesn't start with KRaft on Windows
jsancio commented on code in PR #12763: URL: https://github.com/apache/kafka/pull/12763#discussion_r1319003199 ## raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java: ## @@ -149,6 +149,7 @@ private void writeElectionStateToFile(final File stateFile, QuorumStateData stat writer.write(jsonState.toString()); writer.flush(); fileOutputStream.getFD().sync(); +fileOutputStream.close(); Review Comment: I created this PR which includes this suggestion: https://github.com/apache/kafka/pull/14354 cc @dengziming @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #14354: KAFKA-14273; Close file before atomic move
jsancio opened a new pull request, #14354: URL: https://github.com/apache/kafka/pull/14354 In the Windows OS atomic move are not allowed if the file has an open handle. E.g __cluster_metadata-0\quorum-state: The process cannot access the file because it is being used by another process at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) at java.base/java.nio.file.Files.move(Files.java:1430) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932) at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) This is fixed by first closing the temporary quorum-state file before attempting to move it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions
lianetm commented on PR #14323: URL: https://github.com/apache/kafka/pull/14323#issuecomment-1710622597 Build completed with 6 unrelated test failures: Build / JDK 20 and Scala 2.13 / testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest Build / JDK 8 and Scala 2.12 / testNonDefaultConnectionCountLimitAndRateLimit() – kafka.network.ConnectionQuotasTest Build / JDK 11 and Scala 2.13 / testDelegationTokenRequests(String).quorum=kraft – kafka.server.DelegationTokenRequestsTest Build / JDK 11 and Scala 2.13 / executeTieredStorageTest(String).quorum=kraft – org.apache.kafka.tiered.storage.integration.ReassignReplicaExpandTest Build / JDK 17 and Scala 2.13 / testRackAwareRangeAssignor() – integration.kafka.server.FetchFromFollowerIntegrationTest Build / JDK 17 and Scala 2.13 / testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
philipnee commented on code in PR #14346: URL: https://github.com/apache/kafka/pull/14346#discussion_r1318977376 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -268,7 +326,7 @@ private CompletableFuture buildListOffsetRequestToNode( .forConsumer(requireTimestamps, isolationLevel, false) .setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(targetTimes)); -log.debug("Creating ListOffsetRequest {} for broker {} to reset positions", builder, Review Comment: I really think ListOffsetRequest is fine. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java: ## @@ -315,6 +319,57 @@ void onFailedRequestForResettingPositions( log.error("Discarding error in ListOffsetResponse because another error is pending", error); } + +void onSuccessfulRequestForValidatingPositions( Review Comment: it should be onSuccessfulResponse ? As well as onFailedResponse. Maybe `onSuccessfulValidatePositionResponse` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -298,6 +356,154 @@ private CompletableFuture buildListOffsetRequestToNode( return result; } +/** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ +private List buildListOffsetsRequestsAndResetPositions( +final Map timestampsToSearch) { +Map> timestampsToSearchByNode = +groupListOffsetRequests(timestampsToSearch, Optional.empty()); + +final List unsentRequests = new ArrayList<>(); + +timestampsToSearchByNode.forEach((node, resetTimestamps) -> { +subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), +time.milliseconds() + requestTimeoutMs); + +CompletableFuture partialResult = buildListOffsetRequestToNode( +node, +resetTimestamps, +false, +unsentRequests); + +partialResult.whenComplete((result, error) -> { +if (error == null) { + offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, Review Comment: ditto - should be onSuccessfulResponse right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java: ## @@ -298,6 +356,154 @@ private CompletableFuture buildListOffsetRequestToNode( return result; } +/** + * Make asynchronous ListOffsets request to fetch offsets by target times for the specified + * partitions. + * Use the retrieved offsets to reset positions in the subscription state. + * + * @param timestampsToSearch the mapping between partitions and target time + * @return A list of + * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} + * that can be polled to obtain the corresponding timestamps and offsets. + */ +private List buildListOffsetsRequestsAndResetPositions( +final Map timestampsToSearch) { +Map> timestampsToSearchByNode = +groupListOffsetRequests(timestampsToSearch, Optional.empty()); + +final List unsentRequests = new ArrayList<>(); + +timestampsToSearchByNode.forEach((node, resetTimestamps) -> { +subscriptionState.setNextAllowedRetry(resetTimestamps.keySet(), +time.milliseconds() + requestTimeoutMs); + +CompletableFuture partialResult = buildListOffsetRequestToNode( +node, +resetTimestamps, +false, +unsentRequests); + +partialResult.whenComplete((result, error) -> { +if (error == null) { + offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, +result); +} else { +RuntimeException e; +if (error instanceof RuntimeException) { +e = (RuntimeException) error; +} else { +e = new RuntimeException("Unexpected failure in ListOffsets request for " + +"resetting positions", error); +} + offsetFetcherUtils.onFailedRequestForResettingPositions(resetT
[GitHub] [kafka] lianetm commented on pull request #14346: KAFKA-15115 - KAFKA-15163; Reset/Validate positions implementation & API integration
lianetm commented on PR #14346: URL: https://github.com/apache/kafka/pull/14346#issuecomment-1710621218 Build completed with 6 unrelated test failures: Build / JDK 20 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest Build / JDK 20 and Scala 2.13 / shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest Build / JDK 20 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest Build / JDK 8 and Scala 2.12 / shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest Build / JDK 11 and Scala 2.13 / testRackAwareRangeAssignor() – integration.kafka.server.FetchFromFollowerIntegrationTest Build / JDK 11 and Scala 2.13 / testDescribeClusterRequestExcludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762846#comment-17762846 ] David Jacot commented on KAFKA-12473: - [~kirktrue] [~showuon] In my opinion, we should not do this given that KIP-848 will replace this soon. It is not worth it. > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14273) Kafka doesn't start with KRaft on Windows
[ https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762843#comment-17762843 ] José Armando García Sancio commented on KAFKA-14273: [~satish.duggana] I think it is a blocker. I sent you an email in the release thread. I'll submit a PR shortly. > Kafka doesn't start with KRaft on Windows > - > > Key: KAFKA-14273 > URL: https://issues.apache.org/jira/browse/KAFKA-14273 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Kedar Joshi >Assignee: José Armando García Sancio >Priority: Major > > {{Basic setup doesn't work on Windows 10.}} > *{{Steps}}* > * {{Initialize cluster with -}} > {code:sh} > bin\windows\kafka-storage.bat random-uuid > bin\windows\kafka-storage.bat format -t %cluster_id% -c > .\config\kraft\server.properties{code} > > * Start Kafka with - > {code:sh} > bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code} > > *Stacktrace* > Kafka fails to start with following exception - > {code:java} > D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat > .\config\kraft\server.properties > [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController > MBean (kafka.utils.Log4jControllerRegistration$) > [2022-10-03 23:14:20,375] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated > TLS renegotiation (org.apache.zookeeper.common.X509Util) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading > producer state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state > recovery took 0ms for snapshot load and 0ms for segment recovery from offset > 0 (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() > from > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 > (kafka.raft.KafkaMetadataLog$) > [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting > (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) > [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.UncheckedIOException: Error while writing the Quorum status from the > file > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128) > at > org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477) > at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212) > at > org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369) > at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp > -> > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) > ... 10 mor
[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762842#comment-17762842 ] Kirk True commented on KAFKA-12473: --- [~showuon] Are you working on this change actively? I'm wondering if it's something I can work on? Thanks! > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14273) Kafka doesn't start with KRaft on Windows
[ https://issues.apache.org/jira/browse/KAFKA-14273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio reassigned KAFKA-14273: -- Assignee: José Armando García Sancio > Kafka doesn't start with KRaft on Windows > - > > Key: KAFKA-14273 > URL: https://issues.apache.org/jira/browse/KAFKA-14273 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.1 >Reporter: Kedar Joshi >Assignee: José Armando García Sancio >Priority: Major > > {{Basic setup doesn't work on Windows 10.}} > *{{Steps}}* > * {{Initialize cluster with -}} > {code:sh} > bin\windows\kafka-storage.bat random-uuid > bin\windows\kafka-storage.bat format -t %cluster_id% -c > .\config\kraft\server.properties{code} > > * Start Kafka with - > {code:sh} > bin\windows\kafka-server-start.bat .\config\kraft\server.properties{code} > > *Stacktrace* > Kafka fails to start with following exception - > {code:java} > D:\LocationGuru\Servers\Kafka-3.3>bin\windows\kafka-server-start.bat > .\config\kraft\server.properties > [2022-10-03 23:14:20,089] INFO Registered kafka:type=kafka.Log4jController > MBean (kafka.utils.Log4jControllerRegistration$) > [2022-10-03 23:14:20,375] INFO Setting -D > jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated > TLS renegotiation (org.apache.zookeeper.common.X509Util) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Loading > producer state till offset 0 with message format version 2 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Reloading from > producer snapshot and rebuilding producer state from offset 0 > (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,594] INFO [LogLoader partition=__cluster_metadata-0, > dir=D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs] Producer state > recovery took 0ms for snapshot load and 0ms for segment recovery from offset > 0 (kafka.log.UnifiedLog$) > [2022-10-03 23:14:20,640] INFO Initialized snapshots with IDs SortedSet() > from > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0 > (kafka.raft.KafkaMetadataLog$) > [2022-10-03 23:14:20,734] INFO [raft-expiration-reaper]: Starting > (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) > [2022-10-03 23:14:20,900] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.UncheckedIOException: Error while writing the Quorum status from the > file > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs__cluster_metadata-0\quorum-state > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:155) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionState(FileBasedStateStore.java:128) > at > org.apache.kafka.raft.QuorumState.transitionTo(QuorumState.java:477) > at org.apache.kafka.raft.QuorumState.initialize(QuorumState.java:212) > at > org.apache.kafka.raft.KafkaRaftClient.initialize(KafkaRaftClient.java:369) > at kafka.raft.KafkaRaftManager.buildRaftClient(RaftManager.scala:200) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:83) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) > Caused by: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state.tmp > -> > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_cluster_metadata-0\quorum-state: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:935) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:918) > at > org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) > ... 10 more > Suppressed: java.nio.file.FileSystemException: > D:\LocationGuru\Servers\Kafka-3.3\tmp\kraft-combined-logs_clu
[GitHub] [kafka] philipnee commented on pull request #14313: KAFKA-15416: Fix flaky TopicAdminTest::retryEndOffsetsShouldRetryWhenTopicNotFound test case
philipnee commented on PR #14313: URL: https://github.com/apache/kafka/pull/14313#issuecomment-1710569868 @C0urante - Sorry what i meant was I've seen other locally flaky tests that almost never show up during the jenkins build. Thanks! I'll submit a patch after you merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] blacktooth commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
blacktooth commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1318930687 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: @vamossagar12 @yashmayya Wondering if [RetryUtil](https://github.com/apache/kafka/blob/0029bc4897e603614a49e0b0f1e623abbe650c61/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java#L55) is a better alternative to use to avoid publishing these metrics. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] blacktooth commented on a diff in pull request #13726: KAFKA-13109: SourceTask#poll not enforcing retries in case of RetriableException
blacktooth commented on code in PR #13726: URL: https://github.com/apache/kafka/pull/13726#discussion_r1318930687 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -458,13 +458,8 @@ boolean sendRecords() { } protected List poll() throws InterruptedException { -try { -return task.poll(); -} catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { -log.warn("{} failed to poll records from SourceTask. Will retry operation.", this, e); -// Do nothing. Let the framework poll whenever it's ready. -return null; -} +retryWithToleranceOperator.reset(); +return retryWithToleranceOperator.execute(task::poll, Stage.TASK_POLL, this.getClass()); Review Comment: Wondering if [RetryUtil](https://github.com/apache/kafka/blob/0029bc4897e603614a49e0b0f1e623abbe650c61/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java#L55) is a better alternative to use to avoid publishing these metrics. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java
nizhikov commented on PR #14217: URL: https://github.com/apache/kafka/pull/14217#issuecomment-1710540249 @gharris1727 Thank you very much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on PR #14314: URL: https://github.com/apache/kafka/pull/14314#issuecomment-1710539538 Also -- I assume we want to pick this to 3.6 as well. Is that the only branch @C0urante? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java
gharris1727 commented on PR #14217: URL: https://github.com/apache/kafka/pull/14217#issuecomment-1710524166 Thanks @nizhikov for your patience, and thanks for keeping people notified about this PR. Too many PRs like this one are left stale due to limited committer bandwidth. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14595) Move ReassignPartitionsCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762822#comment-17762822 ] Greg Harris commented on KAFKA-14595: - I merged [https://github.com/apache/kafka/pull/14217] but I am leaving this open as the other PRs have not landed yet. > Move ReassignPartitionsCommand to tools > --- > > Key: KAFKA-14595 > URL: https://issues.apache.org/jira/browse/KAFKA-14595 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Nikolay Izhikov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #14306: KAFKA-15369: Implement KIP-919: Allow AC to Talk Directly with Controllers
cmccabe commented on PR #14306: URL: https://github.com/apache/kafka/pull/14306#issuecomment-1710512791 Fixed conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 merged pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java
gharris1727 merged PR #14217: URL: https://github.com/apache/kafka/pull/14217 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14217: KAFKA-14595 ReassignPartitionsCommandArgsTest rewritten in java
gharris1727 commented on PR #14217: URL: https://github.com/apache/kafka/pull/14217#issuecomment-1710503247 The test failures in CI appear unrelated, and `tools:test` passes for me locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie commented on pull request #14341: KAFKA-15307: Removes non-existent configs
Cerchie commented on PR #14341: URL: https://github.com/apache/kafka/pull/14341#issuecomment-1710482923 > Thanks for the PR. I know that we did remove `partition.grouper`, but the others seems to be valid config. For example https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L665 > > Can you double check again? huh. I wonder if was working from an un-updated fork-- double-checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie commented on a diff in pull request #14322: KAFKA-15418: update statement on decompression
Cerchie commented on code in PR #14322: URL: https://github.com/apache/kafka/pull/14322#discussion_r1318874061 ## docs/design.html: ## @@ -136,8 +136,10 @@ -Kafka supports this with an efficient batching format. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will -remain compressed in the log and will only be decompressed by the consumer. +Kafka supports this with an efficient batching format. A batch of messages can be grouped together, compressed, and sent to the server in this form. The broker decompresses the batch in order to validate it. For +example, it validates that the number of records in the batch is same as what batch header states. The broker may also potentially modify the batch (e.g., if the topic is compacted, the broker will filter out Review Comment: Thanks for checking in on this-- in light of the latest comment, I removed the sentence "The broker may also potentially modify the batch (e.g., if the topic is compacted, the broker will filter out records eligible for compaction prior to writing to disk)." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762808#comment-17762808 ] Benoit Delbosc edited comment on KAFKA-15402 at 9/7/23 4:38 PM: Thanks, I have tested {{max.incremental.fetch.session.cache.slots=0}} without success, same latency than the default 3.5.1. The option was taken in account from the kafka log {code} INFO KafkaConfig values: ... max.incremental.fetch.session.cache.slots = 0 ... {code} was (Author: bdelbosc): Thanks, I have tested {{max.incremental.fetch.session.cache.slots=0}} without success, same latency than the default. > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0, 3.5.1 >Reporter: Benoit Delbosc >Priority: Major > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
C0urante commented on code in PR #14314: URL: https://github.com/apache/kafka/pull/14314#discussion_r1318868378 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -5633,20 +5643,39 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception { node0); final TopicPartition tp1 = new TopicPartition("foo", 0); +final MetadataResponse preparedResponse = prepareMetadataResponse( +cluster, topicMetadataError, partitionMetadataError +); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); -env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.TOPIC_AUTHORIZATION_FAILED)); +env.kafkaClient().prepareResponse(preparedResponse); Map partitions = new HashMap<>(); partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); -TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class); +TestUtils.assertFutureError(result.all(), expectedFailure); } } +private static Stream listOffsetsMetadataNonRetriableErrors() { +return Stream.of( +Arguments.of( +Errors.TOPIC_AUTHORIZATION_FAILED, +Errors.TOPIC_AUTHORIZATION_FAILED, +TopicAuthorizationException.class +), +Arguments.of( +// We fail fast when the entire topic is unknown +Errors.UNKNOWN_TOPIC_OR_PARTITION, +Errors.NONE, Review Comment: Sure, done. I've tried to clarify in the comments that these are unusual cases we're testing for lest anyone think that this is normal broker behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
yashmayya commented on PR #14279: URL: https://github.com/apache/kafka/pull/14279#issuecomment-1710462603 Thanks Chris! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762808#comment-17762808 ] Benoit Delbosc commented on KAFKA-15402: Thanks, I have tested {{max.incremental.fetch.session.cache.slots=0}} without success, same latency than the default. > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0, 3.5.1 >Reporter: Benoit Delbosc >Priority: Major > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on pull request #14349: KAFKA-15352: Update log-start-offset before initiating deletion of remote segments
kamalcph commented on PR #14349: URL: https://github.com/apache/kafka/pull/14349#issuecomment-1710450437 > My point is, should we check for leadership even before updating the log start offset? yes, this is the expectation and being done inside the `handleLogStartOffsetUpdate` method: ```java public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { if (isLeader()) { logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante merged PR #14279: URL: https://github.com/apache/kafka/pull/14279 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1318853979 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_PREFIX = "Message "; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +connect.alterSinkConnectorOffset(CONNECTOR_NAME, new TopicPartition(TOPIC, 0), (long) (NUM_MESSAGES - 1)); + +connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not resume in time"); + +// The last message should be re-processed when the connector is resumed after the offsets are altered +verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); +} + +@Test +public void testResetOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().res
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849972 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_FORMAT = "Message %d"; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); +Map offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); +List offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + +connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); Review Comment: Looks good, thanks 👍 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegra
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849515 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_FORMAT = "Message %d"; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); +Map offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); +List offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + +connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + +connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +
[jira] [Comment Edited] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762676#comment-17762676 ] Victor van den Hoven edited comment on KAFKA-15417 at 9/7/23 4:13 PM: -- Yes, I think it was incorrect and flipped. Sure, I can give it a try to work on a PR. was (Author: victorvandenhoven): Yes, I think it was incorrectly and flipped. Sure, I can give it a try to work on a PR. > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Priority: Major > Attachments: SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #14314: KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found
jolshan commented on code in PR #14314: URL: https://github.com/apache/kafka/pull/14314#discussion_r1318833581 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -5633,20 +5643,39 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception { node0); final TopicPartition tp1 = new TopicPartition("foo", 0); +final MetadataResponse preparedResponse = prepareMetadataResponse( +cluster, topicMetadataError, partitionMetadataError +); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); -env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.TOPIC_AUTHORIZATION_FAILED)); +env.kafkaClient().prepareResponse(preparedResponse); Map partitions = new HashMap<>(); partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); -TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class); +TestUtils.assertFutureError(result.all(), expectedFailure); } } +private static Stream listOffsetsMetadataNonRetriableErrors() { +return Stream.of( +Arguments.of( +Errors.TOPIC_AUTHORIZATION_FAILED, +Errors.TOPIC_AUTHORIZATION_FAILED, +TopicAuthorizationException.class +), +Arguments.of( +// We fail fast when the entire topic is unknown +Errors.UNKNOWN_TOPIC_OR_PARTITION, +Errors.NONE, Review Comment: For my understanding, if the topic also had unknown topic or partition as the partition error message, we would fail fast there too. > With this change, the operation always fails if an error is reported for the entire topic, even if an error is also reported for one or more individual topic partitions. Would we want to add this test case too just so the behavior is also shown in tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14323: KAFKA-15275 - Client state machine basic components, states and initial transitions
philipnee commented on code in PR #14323: URL: https://github.com/apache/kafka/pull/14323#discussion_r1318822595 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AssignorSelection.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.List; +import java.util.Objects; + +/** + * Selection of a type of assignor used by a member to get partitions assigned as part of a + * consumer group. Selection could be one of: + * CLIENT assignors + * SERVER assignors + * + * Client assignors include of a list of + * {@link org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData.Assignor} + * Server assignors include a name of the server assignor selected, ex. uniform, range. + */ +public class AssignorSelection { +public enum Type { CLIENT, SERVER } Review Comment: why couldn't we do `hasClientAssignor()` ? Since we use Optional quite liberally - can we make the assignor Optional as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org