Re: [PR] [MINOR] Code Cleanup (Clients Module) [kafka]
chia7712 commented on PR #16049: URL: https://github.com/apache/kafka/pull/16049#issuecomment-2128711390 > I rebased the PR! I guess other PRs will encounter same failed tests, so please rebase code for others too, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Code Cleanup (Clients Module) [kafka]
sjhajharia commented on PR #16049: URL: https://github.com/apache/kafka/pull/16049#issuecomment-2128709392 Thanks for pointing out @chia7712 I rebased the PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16826) Integrate Native Kafka Docker Image with github Actions
[ https://issues.apache.org/jira/browse/KAFKA-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-16826. --- Fix Version/s: 3.8.0 Resolution: Fixed > Integrate Native Kafka Docker Image with github Actions > --- > > Key: KAFKA-16826 > URL: https://issues.apache.org/jira/browse/KAFKA-16826 > Project: Kafka > Issue Type: Task >Reporter: Krishna Agarwal >Assignee: Krishna Agarwal >Priority: Major > Labels: KIP-974 > Fix For: 3.8.0 > > > Integrate the Native Apache Kafka Docker Image with existing github actions > # Build and test > # Rc release > # Promote -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [MINOR] Code Cleanup (Clients Module) [kafka]
chia7712 commented on PR #16049: URL: https://github.com/apache/kafka/pull/16049#issuecomment-2128703246 @sjhajharia Could you please rebase code to include #16044? It fixes a lot of failed tests :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16826: Integrate Native Docker Image with github actions [kafka]
omkreddy merged PR #16045: URL: https://github.com/apache/kafka/pull/16045 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16835) Add Support for consumer to read in commit order.
Manjunath created KAFKA-16835: - Summary: Add Support for consumer to read in commit order. Key: KAFKA-16835 URL: https://issues.apache.org/jira/browse/KAFKA-16835 Project: Kafka Issue Type: New Feature Components: consumer, offset manager Reporter: Manjunath Currently consumer supports offset order to receive messages.There are some cases where commit order is very important.For example assume case where PostgreSQL-14 randomly streams multiple in-progress large transactions to some intermediate client which starts transactional producer instances for multiple in-progress transactions,using this producer instances client pushes data to kafka. Now consumer should strictly read messages based on commit order. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16826: Integrate Native Docker Image with github actions [kafka]
kagarwal06 commented on code in PR #16045: URL: https://github.com/apache/kafka/pull/16045#discussion_r1612932172 ## .github/workflows/docker_promote.yml: ## @@ -19,10 +19,10 @@ on: workflow_dispatch: inputs: rc_docker_image: -description: RC docker image that needs to be promoted (Example:- apache/kafka:3.6.0-rc0) +description: RC docker image that needs to be promoted (Example:- apache/kafka:3.6.0-rc0, apache/kafka-native:3.6.0-rc0) Review Comment: Updated all the example messages with `3.8.0` version and included `OR`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1612931258 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +KafkaMetric metric = allMetrics.get(quotaMetricName); +if (metric != null) { +LOGGER.warn("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", quotaMetricName, newQuota); +metric.config(getQuotaMetricConfig(newQuota)); +} +} finally { +lock.writeLock().unlock(); +} +} + +public boolean isQuotaExceeded() { +Sensor sensorInstance = sensor(); +try { +sensorInstance.checkQuotas(); +} catch (QuotaViolationException qve) { +LOGGER.debug("Quota violated for sensor ({}), metric: ({}), metric-value: ({}), bound: ({})", +sensorInstance.name(), qve.metric().metricName(), qve.value(), qve.bound()); +return true; +} +return false; +} + +public void record(double value) { +sensor().record(value, time.milliseconds(), false); Review Comment: In KIP-956, we do not utilize the throttle time provided by the quota manager to regulate fetches and copies. For fetch operations, we initially verify quota availability before initiating the retrieval of remote data. If the quota is unavailable, our priority is to serve partitions requiring local data, rather than throttling the client. Therefore, we focus on fulfilling data requests for other partitions in the queue, eliminating the need for throttle time in fetch operations. Similarly, when a RLM Task attempts to copy a segment, it first checks if the write quota is available. If the quota is not available, the thread waits until the quota becomes available. As a result, we do not require throttle time for copies either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service,
Re: [PR] [MINOR] Code Cleanup [kafka]
sjhajharia commented on PR #16021: URL: https://github.com/apache/kafka/pull/16021#issuecomment-2128693543 Hey @chia7712 I have created 5 sub-PRs to address the cleanup module-by-module. - https://github.com/apache/kafka/pull/16049: Clients Module - https://github.com/apache/kafka/pull/16050: Streams Module - https://github.com/apache/kafka/pull/16065: Metadata Module - https://github.com/apache/kafka/pull/16066: Connect Module - https://github.com/apache/kafka/pull/16067: All other modules (as the number of changes per module were very less) Hope that folks can chime in and provide their feedbacks. Good Day! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [MINOR] : Code Cleanup - Misc modules [kafka]
sjhajharia opened a new pull request, #16067: URL: https://github.com/apache/kafka/pull/16067 ## What Code Cleanup in Misc Modules ## Changes Some common changes include - Replace the Arrays.asList() with Collections.singletonList() wherever possible - Cleaning up some if-else blocks - Replacing some instances of String.builder() with String operations Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]
appchemist commented on PR #16043: URL: https://github.com/apache/kafka/pull/16043#issuecomment-2128669455 > i really want to keep the network client delegate to just interfacing with the network client, so i wonder if it would be a better design to handle the metadata error in a separated module. wdyt? I think there are pros and cons to each approach. Current Patch: Pros: - Simple - Enhanced code comprehension: Similar to legacy code Cons: - Violates SRP - `NetworkClientDelegate` will also have non-interfacing behaviour. Separated Modules(`PropagateErrorMetadataUpdater` in https://github.com/apache/kafka/pull/15961): Pros: - Adheres to SOLID principles Cons: - Can be more complex - Act on the client network layer. I'm having difficulty determining which aspect should be prioritized. Like you said, it made me think about design as a priority. Our goal is to propagate metadata errors from the background thread to the foreground thread at the appropriate time. `MetadataUpdater` seems to play a "role" in handling metadata changes, making it a suitable candidate for propagating metadata errors. `PropagateErrorMetadataUpdater` acts as a proxy for handling metadata errors. We've taken an approach that extends metadata error handling without modifying `NetworkClient`'s logic. What do you think? You can see the approach in https://github.com/apache/kafka/pull/15961 and I would appreciate your review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]
appchemist commented on PR #16043: URL: https://github.com/apache/kafka/pull/16043#issuecomment-2128663846 Hi @philipnee, Thank you for review! > do you think we can put the metadata error in the ConsumerNetworkThread and have an exception handler to relay the error back to the user via the background queue? Yes I know. However, I would appreciate your advice -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chia7712 commented on PR #15863: URL: https://github.com/apache/kafka/pull/15863#issuecomment-2128658669 @gaurav-narula this PR adopt your solution (https://github.com/apache/kafka/pull/15863#discussion_r1590313031) now, so it would be great to have your reviews before merging. thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]
chia7712 commented on PR #15863: URL: https://github.com/apache/kafka/pull/15863#issuecomment-2128654632 @chiacyu Could you please rebase PR to run QA with newest code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16771 First log directory printed twice when formatting storage [kafka]
chia7712 commented on code in PR #16010: URL: https://github.com/apache/kafka/pull/16010#discussion_r1612892755 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -206,12 +206,20 @@ Found problem: @Test def testFormatSucceedsIfAllDirectoriesAreAvailable(): Unit = { -val availableDir1 = TestUtils.tempDir() -val availableDir2 = TestUtils.tempDir() +val availableDirs = Seq(TestUtils.tempDir(), TestUtils.tempDir(), TestUtils.tempDir()).map(dir => dir.toString) val stream = new ByteArrayOutputStream() -assertEquals(0, runFormatCommand(stream, Seq(availableDir1.toString, availableDir2.toString))) -assertTrue(stream.toString().contains("Formatting %s".format(availableDir1))) -assertTrue(stream.toString().contains("Formatting %s".format(availableDir2))) +assertEquals(0, runFormatCommand(stream, availableDirs)) Review Comment: Maybe we can decorate it with following style ```scala val availableDirs = Seq(TestUtils.tempDir(), TestUtils.tempDir(), TestUtils.tempDir()).map(dir => dir.toString) val stream = new ByteArrayOutputStream() assertEquals(0, runFormatCommand(stream, availableDirs)) val actual = stream.toString().split("\\r?\\n") val expected = availableDirs.map("Formatting %s".format(_)) // The duplicate messages mean some dirs get formated repeatedly assertEquals(availableDirs.size, actual.size) actual.foreach(line => expected.exists(line.startsWith)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
artemlivshits commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612863802 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -21,16 +21,17 @@ public enum TestFeatureVersion implements FeatureVersion { -TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), +// TEST_1 released right before MV 3.7-IVO was released, and it has no dependencies TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), +// TEST_2 released right before MV 3.8-IVO was released, and it depends on this metadata version TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_8_IV0.featureLevel())); -private short featureLevel; -private MetadataVersion metadataVersionMapping; -private Map dependencies; +private final short featureLevel; +private final MetadataVersion metadataVersionMapping; +private final Map dependencies; public static final String FEATURE_NAME = "test.feature.version"; -public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; +public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1; Review Comment: Does it mean that the broker will show the test feature to the clients? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: I think it would be confusing if a user specifies the latest MV version and the result would be different from when nothing is specified (implied assumption is that nothing is shortcut for latest MV known to the tool). It would also be confusing if by default (nothing is specified) we don't have all features set to the latest versions known to the tool. We can provide guidance to new features developers in the comments (and the test feature example) and add a unit test that enforces the equivalence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [MINOR] Code Cleanup - Connect Module [kafka]
sjhajharia opened a new pull request, #16066: URL: https://github.com/apache/kafka/pull/16066 ## What Code Cleanup in Connect Module ## Changes Some common changes include - Replace the Arrays.asList() with Collections.singletonList() wherever possible - Cleaning up some if-else blocks - Replacing some instances of String.builder() with String operations Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [MINOR]: Code Cleanup - Metadata module [kafka]
sjhajharia opened a new pull request, #16065: URL: https://github.com/apache/kafka/pull/16065 ## What Code Cleanup in metadata Module ## Changes Some common changes include - Replace the Arrays.asList() with Collections.singletonList() wherever possible - Cleaning up some if-else blocks - Replacing some instances of String.builder() with String operations Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor [kafka]
ableegoldman commented on code in PR #16052: URL: https://github.com/apache/kafka/pull/16052#discussion_r1612714758 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { +private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + +private Map configs = new HashMap<>(); +private final boolean mustPreserveActiveTaskAssignment; + +public StickyTaskAssignor() { +this(false); +} + +public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { +this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; +} + +@Override +public void configure(final Map configs) { Review Comment: Already discussed but just to keep track: remove this override (and add a default to interface method if necessary) ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import
Re: [PR] KAFKA-16796: Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder [kafka]
chia7712 commented on code in PR #16064: URL: https://github.com/apache/kafka/pull/16064#discussion_r1612751256 ## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ## @@ -651,4 +651,17 @@ object DumpLogSegments { def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, options, filesOpt) } + + private[tools] def newDecoder(className: String, props: VerifiableProperties): Decoder[_] = { +val kclass = Class.forName(className).getConstructor(props.getClass).newInstance(props) Review Comment: This way will require all implementations of new decoder have to offer a constructor accepting properties. It seems to me that is not what we expected, as the props is always empty. Also, we should use Reconfigurable interface for consistency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16796: Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder [kafka]
FrankYang0529 opened a new pull request, #16064: URL: https://github.com/apache/kafka/pull/16064 We need a replacement in order to complete https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16834) add the reason for the failure of PartitionRegistration#toRecord
[ https://issues.apache.org/jira/browse/KAFKA-16834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jianbin Chen updated KAFKA-16834: - Description: Transform it into the following output, which is easier for users to understand and identify the cause of the problem. {code:java} options.handleLoss("the directory " + (directory == DirectoryId.UNASSIGNED ? "unassigned" : "lost") + " state of one or more replicas");{code} was: Transform it into the following output, which is easier for users to understand and identify the cause of the problem. {code:java} options.handleLoss("the directory " + directory + " state of one or more replicas");{code} > add the reason for the failure of PartitionRegistration#toRecord > > > Key: KAFKA-16834 > URL: https://issues.apache.org/jira/browse/KAFKA-16834 > Project: Kafka > Issue Type: Wish >Affects Versions: 3.7.0 >Reporter: Jianbin Chen >Priority: Minor > > Transform it into the following output, which is easier for users to > understand and identify the cause of the problem. > {code:java} > options.handleLoss("the directory " + (directory == DirectoryId.UNASSIGNED ? > "unassigned" : "lost") > + " state of one or more replicas");{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16834: add the reason for the failure of PartitionRegistration#toRecord [kafka]
funky-eyes opened a new pull request, #16063: URL: https://github.com/apache/kafka/pull/16063 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16834) add the reason for the failure of PartitionRegistration#toRecord
[ https://issues.apache.org/jira/browse/KAFKA-16834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jianbin Chen updated KAFKA-16834: - Summary: add the reason for the failure of PartitionRegistration#toRecord (was: add PartitionRegistration#toRecord loss info) > add the reason for the failure of PartitionRegistration#toRecord > > > Key: KAFKA-16834 > URL: https://issues.apache.org/jira/browse/KAFKA-16834 > Project: Kafka > Issue Type: Wish >Affects Versions: 3.7.0 >Reporter: Jianbin Chen >Priority: Minor > > Transform it into the following output, which is easier for users to > understand and identify the cause of the problem. > {code:java} > options.handleLoss("the directory " + directory + " state of one or more > replicas");{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16834) add PartitionRegistration#toRecord loss info
Jianbin Chen created KAFKA-16834: Summary: add PartitionRegistration#toRecord loss info Key: KAFKA-16834 URL: https://issues.apache.org/jira/browse/KAFKA-16834 Project: Kafka Issue Type: Wish Affects Versions: 3.7.0 Reporter: Jianbin Chen Transform it into the following output, which is easier for users to understand and identify the cause of the problem. {code:java} options.handleLoss("the directory " + directory + " state of one or more replicas");{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16583: -- Fix Version/s: 3.8.0 3.7.1 > Update from 3.4.0 to 3.7.0 image write failed in Kraft mode > --- > > Key: KAFKA-16583 > URL: https://issues.apache.org/jira/browse/KAFKA-16583 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0 >Reporter: HanXu >Assignee: HanXu >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > Original Estimate: 6h > Remaining Estimate: 6h > > How to reproduce: > 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; > 2. Create a topic with 1 partition; > 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 > partition to Broker B; > 4. Upgrade Broker B to 3.7.0; > The Broker B will keep log the following error: > {code:java} > [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.LoggingFaultHandler) > org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been > lost because the following could not be represented in metadata version > 3.4-IV0: the directory assignment state of one or more replicas > at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > at > org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:840) > {code} > Bug: > - When reassigning partition, PartitionRegistration#merge will set the new > replicas with UNASSIGNED directory; > - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows > MIGRATING directory; > {code:java} > if (options.metadataVersion().isDirectoryAssignmentSupported()) { > record.setDirectories(Uuid.toList(directories)); > } else { > for (Uuid directory : directories) { > if (!DirectoryId.MIGRATING.equals(directory)) { > options.handleLoss("the directory assignment state of one > or more replicas"); > break; > } > } > } > {code} > Solution: > - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-16583: -- Priority: Blocker (was: Major) > Update from 3.4.0 to 3.7.0 image write failed in Kraft mode > --- > > Key: KAFKA-16583 > URL: https://issues.apache.org/jira/browse/KAFKA-16583 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0 >Reporter: HanXu >Assignee: HanXu >Priority: Blocker > Original Estimate: 6h > Remaining Estimate: 6h > > How to reproduce: > 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; > 2. Create a topic with 1 partition; > 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 > partition to Broker B; > 4. Upgrade Broker B to 3.7.0; > The Broker B will keep log the following error: > {code:java} > [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.LoggingFaultHandler) > org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been > lost because the following could not be represented in metadata version > 3.4-IV0: the directory assignment state of one or more replicas > at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > at > org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:840) > {code} > Bug: > - When reassigning partition, PartitionRegistration#merge will set the new > replicas with UNASSIGNED directory; > - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows > MIGRATING directory; > {code:java} > if (options.metadataVersion().isDirectoryAssignmentSupported()) { > record.setDirectories(Uuid.toList(directories)); > } else { > for (Uuid directory : directories) { > if (!DirectoryId.MIGRATING.equals(directory)) { > options.handleLoss("the directory assignment state of one > or more replicas"); > break; > } > } > } > {code} > Solution: > - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16583: fix PartitionRegistration#toRecord directory check under metadata version 3_7_IV2 [kafka]
showuon commented on PR #15751: URL: https://github.com/apache/kafka/pull/15751#issuecomment-2128463581 Thanks for fixing this issue. I'll take a look this week or next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] ConfigRegistry [kafka]
github-actions[bot] commented on PR #15428: URL: https://github.com/apache/kafka/pull/15428#issuecomment-2128459494 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849145#comment-17849145 ] Jianbin Chen commented on KAFKA-16583: -- I want to know when this PR can be merged, I am deeply affected by this bug! > Update from 3.4.0 to 3.7.0 image write failed in Kraft mode > --- > > Key: KAFKA-16583 > URL: https://issues.apache.org/jira/browse/KAFKA-16583 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0 >Reporter: HanXu >Assignee: HanXu >Priority: Major > Original Estimate: 6h > Remaining Estimate: 6h > > How to reproduce: > 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; > 2. Create a topic with 1 partition; > 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 > partition to Broker B; > 4. Upgrade Broker B to 3.7.0; > The Broker B will keep log the following error: > {code:java} > [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.LoggingFaultHandler) > org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been > lost because the following could not be represented in metadata version > 3.4-IV0: the directory assignment state of one or more replicas > at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > at > org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:840) > {code} > Bug: > - When reassigning partition, PartitionRegistration#merge will set the new > replicas with UNASSIGNED directory; > - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows > MIGRATING directory; > {code:java} > if (options.metadataVersion().isDirectoryAssignmentSupported()) { > record.setDirectories(Uuid.toList(directories)); > } else { > for (Uuid directory : directories) { > if (!DirectoryId.MIGRATING.equals(directory)) { > options.handleLoss("the directory assignment state of one > or more replicas"); > break; > } > } > } > {code} > Solution: > - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]
ahuang98 opened a new pull request, #16062: URL: https://github.com/apache/kafka/pull/16062 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
jeqo commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1612459168 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -202,6 +204,225 @@ public Object valueFrom(Map map) { return current.get(lastStep()); } +/** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Map updateValueFrom( +Map originalValue, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); +} + +@SuppressWarnings("unchecked") +private Map updateValue( +Map originalValue, +int step, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +if (originalValue == null) return null; +Map updatedParent = new HashMap<>(originalValue.size()); +boolean found = false; +for (Map.Entry entry : originalValue.entrySet()) { +String fieldName = entry.getKey(); +Object fieldValue = entry.getValue(); +if (steps.get(step).equals(fieldName)) { +found = true; +if (step < lastStepIndex()) { +if (fieldValue instanceof Map) { +Map updatedField = updateValue( +(Map) fieldValue, +step + 1, +whenFound, +whenNotFound, +whenOther); +updatedParent.put(fieldName, updatedField); +} else { +// add back to not found and apply others, as only leaf values are updated +found = false; +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} else { +whenFound.apply(originalValue, updatedParent, this, fieldName); +} +} else { +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} + +if (!found) { +whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); +} + +return updatedParent; +} + +/** + * Access {@code Struct} fields and apply functions to update field values. + * + * @param originalSchema original struct schema + * @param originalValue schema-based data value + * @param updatedSchema updated struct schema + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Struct updateValueFrom( +Schema originalSchema, +Struct originalValue, +Schema updatedSchema, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +return updateValue(originalSchema, originalValue, updatedSchema, 0, whenFound, whenNotFound, whenOther); +} + +private Struct updateValue( +Schema originalSchema, +Struct originalValue, +Schema updateSchema, +int step, +StructValueUpdater whenFound, +StructValueUpdater whenNotFound, +StructValueUpdater whenOther +) { +Struct updated = new Struct(updateSchema); +boolean found = false; +for (Field field : originalSchema.fields()) { +if (step < steps.size()) { +if (steps.get(step).equals(field.name())) { +found = true; +if (step == lastStepIndex()) { +whenFound.apply( +originalValue, +field, +updated, +updateSchema.field(field.name()), +this +); +} else { +if (field.schema().type() == Schema.Type.STRUCT) { +Struct fieldValue = updateValue( +field.schema(), +originalValue.getStruct(field.name()), +updateSchema.field(field.name()).schema(), +step + 1, +
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
jeqo commented on code in PR #15893: URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -202,6 +204,225 @@ public Object valueFrom(Map map) { return current.get(lastStep()); } +/** + * Access {@code Map} fields and apply functions to update field values. + * + * @param originalValue schema-based data value + * @param whenFound function to apply when path is found + * @param whenNotFound function to apply when path is not found + * @param whenOther function to apply on fields not matched by path + * @return updated data value + */ +public Map updateValueFrom( +Map originalValue, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +return updateValue(originalValue, 0, whenFound, whenNotFound, whenOther); +} + +@SuppressWarnings("unchecked") +private Map updateValue( +Map originalValue, +int step, +MapValueUpdater whenFound, +MapValueUpdater whenNotFound, +MapValueUpdater whenOther +) { +if (originalValue == null) return null; +Map updatedParent = new HashMap<>(originalValue.size()); +boolean found = false; +for (Map.Entry entry : originalValue.entrySet()) { +String fieldName = entry.getKey(); +Object fieldValue = entry.getValue(); +if (steps.get(step).equals(fieldName)) { +found = true; +if (step < lastStepIndex()) { +if (fieldValue instanceof Map) { +Map updatedField = updateValue( +(Map) fieldValue, +step + 1, +whenFound, +whenNotFound, +whenOther); +updatedParent.put(fieldName, updatedField); +} else { +// add back to not found and apply others, as only leaf values are updated +found = false; +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} else { +whenFound.apply(originalValue, updatedParent, this, fieldName); +} +} else { +whenOther.apply(originalValue, updatedParent, null, fieldName); +} +} + +if (!found) { +whenNotFound.apply(originalValue, updatedParent, this, steps.get(step)); +} + +return updatedParent; +} Review Comment: Thanks! I like the proposed approach. The only change I'm considering to add is to return an intact map if value is not found (the proposal is to return null). This should make the behavior consistent with update Struct and Schema that do not make this distinction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] return to commit f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a [kafka]
cmccabe opened a new pull request, #16061: URL: https://github.com/apache/kafka/pull/16061 commit db118aba6fbddfc607c6ef653f9965a5a73c778a (HEAD -> return-to-monkey, cmccabe/return-to-monkey, trunk) Author: Colin P. McCabe Date: Thu May 23 16:11:24 2024 -0700 return to commit f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a Return to commit f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a to see if it builds more normally than current trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
gharris1727 commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128174644 > > Something is wrong with the build of this commit > > There still seems to be some issue with builds 🤔 > [2024-05-23T21:45:16.078Z] Could not stop org.gradle.internal.actor.internal.DefaultActorFactory$NonBlockingActor@1d0918ec. That was probably me. Marking the PR as ready-for-review forcefully killed the previous build, and the next build is still waiting in the queue. I think that the status message should say "queued", and it just wasn't prepared for this strange state. If it doesn't resolve in a few hours then we might want to be worried :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]
junrao commented on code in PR #15993: URL: https://github.com/apache/kafka/pull/15993#discussion_r1612451041 ## storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java: ## @@ -55,16 +61,40 @@ public class LeaderEpochFileCache { /** * @param topicPartition the associated topic partition * @param checkpoint the checkpoint file + * @param scheduler the scheduler to use for async I/O operations */ @SuppressWarnings("this-escape") -public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint) { +public LeaderEpochFileCache(TopicPartition topicPartition, LeaderEpochCheckpoint checkpoint, Scheduler scheduler) { this.checkpoint = checkpoint; this.topicPartition = topicPartition; +this.scheduler = scheduler; LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] "); log = logContext.logger(LeaderEpochFileCache.class); checkpoint.read().forEach(this::assign); } +/** + * Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file. + * The provided epoch entries are expected to no less fresh than the checkpoint file. Review Comment: to no less fresh => to be no less fresh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16833) Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode
[ https://issues.apache.org/jira/browse/KAFKA-16833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alyssa Huang updated KAFKA-16833: - Summary: Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode (was: PartitionInfo missing equals and hashCode methods ) > Cluster missing topicIds from equals and hashCode, PartitionInfo missing > equals and hashCode > > > Key: KAFKA-16833 > URL: https://issues.apache.org/jira/browse/KAFKA-16833 > Project: Kafka > Issue Type: Bug >Reporter: Alyssa Huang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
jolshan commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128160293 Any ideas about ``` [2024-05-23T21:45:16.078Z] > Task :connect:runtime:test [2024-05-23T21:45:16.078Z] Could not stop org.gradle.internal.actor.internal.DefaultActorFactory$NonBlockingActor@1d0918ec. [2024-05-23T21:45:16.078Z] org.gradle.internal.dispatch.DispatchException: Could not dispatch message [MethodInvocation method: stop()]. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
jolshan commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128158835 > Something is wrong with the build of this commit There still seems to be some issue with builds 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612404062 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: Ok sorry to be a little all over the place here. I think we should have semantics where on each new feature version released to production, we have a MV released to production. In this case, whether we specify latest production (as it did before) or use an empty optional to specify the latest production feature SHOULD be equivalent. The only case it is not is if someone improperly doesn't set the metadataMapping correctly. There isn't a great way to enforce that (I can do so via a test), but I guess the question is whether we prefer the empty approach that ensures the latest features are provided OR if we prefer the latest production metadata approach that is simpler but may risk not picking up features if folks implement the method incorrectly/don't update the MV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor [kafka]
ableegoldman commented on code in PR #16052: URL: https://github.com/apache/kafka/pull/16052#discussion_r1612405084 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { +private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + +private Map configs = new HashMap<>(); +private final boolean mustPreserveActiveTaskAssignment; + +public StickyTaskAssignor() { +this(false); +} + +public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { +this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; +} + +@Override +public void configure(final Map configs) { +// TODO: The application state already has the assignment configs object, why should this +// assignor be configurable? +this.configs = configs; +} + +@Override +public TaskAssignment assign(final ApplicationState applicationState) { +final int taskCount = applicationState.allTasks().size(); +if (taskCount == 0) { Review Comment: (I filed a ticket for this already btw) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor [kafka]
ableegoldman commented on code in PR #16052: URL: https://github.com/apache/kafka/pull/16052#discussion_r1612403956 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { +private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + +private Map configs = new HashMap<>(); +private final boolean mustPreserveActiveTaskAssignment; + +public StickyTaskAssignor() { +this(false); +} + +public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { +this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; +} + +@Override +public void configure(final Map configs) { +// TODO: The application state already has the assignment configs object, why should this +// assignor be configurable? +this.configs = configs; +} + +@Override +public TaskAssignment assign(final ApplicationState applicationState) { +final int taskCount = applicationState.allTasks().size(); +if (taskCount == 0) { Review Comment: technically this would be a bug, that should be caught prior to ever invoking the assignor, so I don't think we should/need to check this Also: I would say that if we did want to return an empty assignment, it should be a real TaskAssignment with a KafkaStreamsAssignment for each KafkaStreamsState that appears in the ApplicationState, but with those just being empty KafkaStreamsAssignment objects (ie no actual tasks assigned). I actually think it should be considered an error if the returned `TaskAssignment` does not contain a KafkaStreamsAssignment for each KafkaStreamsState in the input. When we get around to implementing the assignment validation and `#onAssignmentComputed` callback with the error codes, we should probably have a dedicated error code for this case (if we don't already, don't remember exactly which error codes we had in the original KIP) Doesn't matter for now since imo we should remove this check altogether, but good to keep in mind -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15541: Add oldest-iterator-open-since-ms metric [kafka]
mjsax commented on code in PR #16041: URL: https://github.com/apache/kafka/pull/16041#discussion_r1612395523 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java: ## @@ -451,6 +455,23 @@ public static void addNumOpenIteratorsGauge(final String taskId, } +public static void addOldestOpenIteratorGauge(final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics, + final Gauge oldestOpenIteratorGauge) { +streamsMetrics.addStoreLevelMutableMetric( +taskId, +storeType, +storeName, +OLDEST_ITERATOR_OPEN_SINCE_MS, +OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION, +RecordingLevel.INFO, +oldestOpenIteratorGauge +); + Review Comment: nit: remove blank line ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ## @@ -169,6 +172,10 @@ private void registerMetrics() { iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> numOpenIterators.get()); +StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics, +(config, now) -> openIterators.isEmpty() ? null : + openIterators.stream().mapToLong(MeteredIterator::startTimestamp).min().getAsLong() Review Comment: I don't want to over-engineer (given that we can safely assume that the `openIterator` set should be small), but wondering if this is the best implementation? In the end, we only want to track the create ts, not the iterators themselves. And for create ts we could just maintain a list if longs, and we would `return list.first()` here, and always append to the end of the list when a new iterator is created? Only "remove" would be more expensive, but we could use a sorted tree for the list, and thus remove would be O(log n) not O(n)). For this case, we also don't need the `MeteredIterator` helper interface. Thoughts? ## streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java: ## @@ -490,6 +490,31 @@ public void shouldTimeIteratorDuration() { assertThat((double) iteratorDurationMaxMetric.metricValue(), equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1))); } +@Test +public void shouldTrackOldestOpenIteratorTimestamp() { +when(inner.all()).thenReturn(KeyValueIterators.emptyIterator()); +init(); + +final KafkaMetric oldestIteratorTimestampMetric = metric("oldest-iterator-open-since-ms"); +assertThat(oldestIteratorTimestampMetric, not(nullValue())); + +assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue()); + +try (final KeyValueIterator first = metered.all()) { +final long oldestTimestamp = mockTime.milliseconds(); +assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); +mockTime.sleep(100); + +// open a second iterator before closing the first to test that we still produce the first iterator's timestamp +try (final KeyValueIterator second = metered.all()) { +assertThat((Long) oldestIteratorTimestampMetric.metricValue(), equalTo(oldestTimestamp)); +mockTime.sleep(100); +} Review Comment: It would be better to not close the second iterator here, but close the first one first, to see if the metric advances to the second's iterator create ts -- would need some rewriting of the test; try-with-resource won't allow for proper nesting, but we can still use try-finally. Might actually be best, to open like 5 iterators and close them in some non-linear order (including closing the oldest one like 2 or 3 times) to verify correct behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16833) PartitionInfo missing equals and hashCode methods
Alyssa Huang created KAFKA-16833: Summary: PartitionInfo missing equals and hashCode methods Key: KAFKA-16833 URL: https://issues.apache.org/jira/browse/KAFKA-16833 Project: Kafka Issue Type: Bug Reporter: Alyssa Huang -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: Fix RackAwareTaskAssignorTest [kafka]
apourchet closed pull request #16056: KAFKA-15045: Fix RackAwareTaskAssignorTest URL: https://github.com/apache/kafka/pull/16056 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
gharris1727 commented on code in PR #16051: URL: https://github.com/apache/kafka/pull/16051#discussion_r1612358604 ## Jenkinsfile: ## @@ -36,6 +36,12 @@ def doTest(env, target = "test") { junit '**/build/test-results/**/TEST-*.xml' } +def runTestOnDevBranch(env) { + if (!isChangeRequest(env)) { +doTest(env) + } +} Review Comment: I'm hesitant to sign up future release managers for additional verification of the 11 and 17 builds that the CI has been doing for previous releases. I don't expect any failures to be introduced that affect 11 and 17 but not 8 or 21, but I would rather find those failures out in CI after merge than during the RC generation stage. And for selfish reasons, I also would like to continue seeing the 11 and 17 trunk results published so I can do flaky test analysis. With this patch as-is, we're keeping the same number of test runs on trunk, but if we removed these entirely we would be cutting those in half. I don't pay attention to the PR builds for statistics because contributors may have bad patches applied that aren't worth investigating. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
gharris1727 commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128068313 Looking at the latest CI run, the 11 build took ~15 minutes, the 17 build took 11 minutes, and the 8 and 21 builds are continuing as normal. In the last 28 days, Kafka has run: 9380 builds in total 8514 builds on PRs 748 builds on trunk 111 builds on 3.7 7 builds on 3.6 -> 90% of the builds are on PRs I think we can expect more than a 40% reduction in our total CI load after this change, and I don't think we're losing any test coverage. I'm comfortable with this change as-is, and we can pursue further reductions as they become desired. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
ijuma commented on code in PR #16051: URL: https://github.com/apache/kafka/pull/16051#discussion_r1612342902 ## Jenkinsfile: ## @@ -36,6 +36,12 @@ def doTest(env, target = "test") { junit '**/build/test-results/**/TEST-*.xml' } +def runTestOnDevBranch(env) { + if (!isChangeRequest(env)) { +doTest(env) + } +} Review Comment: Ah, yes - we'll also have to configure the brokers, etc. to use Java 17. In any case, I think whatever we have to do there won't be any harder if we just delete the Java 11 and 17 builders for now. And it will make the 3.8 builds lighter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
ijuma commented on code in PR #16051: URL: https://github.com/apache/kafka/pull/16051#discussion_r1612341398 ## Jenkinsfile: ## @@ -36,6 +36,12 @@ def doTest(env, target = "test") { junit '**/build/test-results/**/TEST-*.xml' } +def runTestOnDevBranch(env) { + if (!isChangeRequest(env)) { +doTest(env) + } +} Review Comment: I don't see the value - once we drop support for Java 8, we will simply change the Java 8 builder to Java 11. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
gharris1727 commented on code in PR #16051: URL: https://github.com/apache/kafka/pull/16051#discussion_r1612327285 ## Jenkinsfile: ## @@ -36,6 +36,12 @@ def doTest(env, target = "test") { junit '**/build/test-results/**/TEST-*.xml' } +def runTestOnDevBranch(env) { + if (!isChangeRequest(env)) { +doTest(env) + } +} Review Comment: What do you think about leaving the 11 and 17 branch builders as-is until 4.0? This will need some rework then to effect the KIPs that are changing the minimum version requirements and having different minimums for the different modules. We can figure out a strategy for branch & pr builds then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: Fix RackAwareTaskAssignorTest [kafka]
chia7712 commented on PR #16056: URL: https://github.com/apache/kafka/pull/16056#issuecomment-2128047643 @apourchet Thanks for your contribution. The failed test is already fixed by #16044 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16828) RackAwareTaskAssignorTest failed
[ https://issues.apache.org/jira/browse/KAFKA-16828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16828. Fix Version/s: 3.8.0 Resolution: Fixed > RackAwareTaskAssignorTest failed > > > Key: KAFKA-16828 > URL: https://issues.apache.org/jira/browse/KAFKA-16828 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.8.0 > > > Found in the latest trunk build. > It fails many tests in `RackAwareTaskAssignorTest` suite. > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15951/7/#showFailuresLink -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16828: RackAwareTaskAssignorTest failed [kafka]
chia7712 merged PR #16044: URL: https://github.com/apache/kafka/pull/16044 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612321070 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: If we want to change it back, I will also update the comments in the metadataVersionMapping method as it will not be correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15630) Improve documentation of offset.lag.max
[ https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ganesh Sadanala reassigned KAFKA-15630: --- Assignee: Ganesh Sadanala > Improve documentation of offset.lag.max > --- > > Key: KAFKA-15630 > URL: https://issues.apache.org/jira/browse/KAFKA-15630 > Project: Kafka > Issue Type: Improvement > Components: docs, mirrormaker >Reporter: Mickael Maison >Assignee: Ganesh Sadanala >Priority: Major > Labels: newbie > > It would be good to expand on the role of this configuration on offset > translation and mention that it can be set to a smaller value, or even 0, to > help in scenarios when records may not flow constantly. > The documentation string is here: > [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: We do not. This was changed yesterday and the code was as you say in the comment. > Do we get the same result by passing in that metadataVersion to feature.defaultValue() If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday. I originally changed it in the case where we want to piggyback on the next MV and we may mark the feature as production ready but not the MV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: We do not. This was changed yesterday and the code was as you say in the comment. Here is the behavior as it stands now: > Do we get the same result by passing in that metadataVersion to feature.defaultValue() If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +111,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features], +usesVersionDefault: Boolean): Unit = { +// If we are using --version-default, the default is based on the metadata version. +val metadataVersionForDefault = if (usesVersionDefault) Optional.of(metadataVersion) else Optional.empty[MetadataVersion]() Review Comment: We do not. This was changed yesterday and the code was as you say in the comment. > Do we get the same result by passing in that metadataVersion to feature.defaultValue() If we follow the protocol of creating a new MV for each new feature and making them production ready at the same time then the answer to your question is yes. If we want to codify this and require a new MV (used only for mapping a default) for every new feature to be created when we mark the feature as production ready, I can switch it back to how it was yesterday. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
gharris1727 commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128037835 > It's rare that there is an issue with only java 11/17 and not 8 or 21. I've most frequently seen issues with scala 2.12 and that's on the java 8 build. I agree, and that was why I picked 11 and 17 to affect here. > If we still do the compile step, we still need the nodes right? We would still need the nodes, but instead of 4 nodes for 3 hours each (12 node-hours) we would need 2 nodes for 3 hours and 2 nodes for ~15 minutes (6.5 node-hours). I'm not very familiar with the jenkins pipeline declaration syntax, and I'm not sure how to express this change in a way which doesn't generate 11 & 17 builds for PRs at all, because it seems like it can only evaluate conditions like isChangeRequest after the stage is already defined. Maybe someone that is more familiar with jenkins can suggest an alternative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314478 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -141,6 +189,9 @@ object StorageTool extends Logging { formatParser.addArgument("--release-version", "-r"). action(store()). help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") +formatParser.addArgument("--feature"). + help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). Review Comment: We can do so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314269 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +105,51 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features]): Unit = { +// If we are using --version-default, the default is based on the metadata version. Review Comment: This was a typo. I will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: log coordinator event errors [kafka]
jeffkbkim opened a new pull request, #16060: URL: https://github.com/apache/kafka/pull/16060 We don't log any error logs when a coordinator event fails to execute to completion. This patch adds some logging ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314113 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -156,16 +200,27 @@ object StorageTool extends Logging { def getMetadataVersion( namespace: Namespace, +featureNamesAndLevelsMap: Map[String, java.lang.Short], defaultVersionString: Option[String] ): MetadataVersion = { val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } -Option(namespace.getString("release_version")) - .map(ver => MetadataVersion.fromVersionString(ver)) - .getOrElse(defaultValue) +val releaseVersionTag = Option(namespace.getString("release_version")) +val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) + +(releaseVersionTag, featureTag) match { + case (Some(_), Some(_)) => +throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") Review Comment: Right. I can update this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1612313415 ## core/src/main/scala/kafka/server/BrokerFeatures.scala: ## @@ -75,16 +76,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { -Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, +val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), if (unstableMetadataVersionsEnabled) { MetadataVersion.latestTesting.featureLevel } else { MetadataVersion.latestProduction.featureLevel - } -))) + })) + FeatureVersion.PRODUCTION_FEATURES.forEach { feature => Review Comment: This will be in a followup. I'm working on it in the background. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
jolshan commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128028213 > I think it would be worth running the compile steps for these older versions (i.e., "doValidation"). As far as I know, those steps only fail if there's actually a problem. It's rare that there is an issue with only java 11/17 and not 8 or 21. I've most frequently seen issues with scala 2.12 and that's on the java 8 build. If we still do the compile step, we still need the nodes right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16530) Fix high-watermark calculation to not assume the leader is in the voter set
[ https://issues.apache.org/jira/browse/KAFKA-16530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849121#comment-17849121 ] Alyssa Huang commented on KAFKA-16530: -- In the case the leader is removed from the voter set, and tries to update its log end offset (`updateLocalState`) because of a new removeNode record for instance, it will first update its own ReplicaState (`getOrCreateReplicaState`) which will return a _new_ Observer state if its id is no longer in the `voterStates` map. The endOffset will be updated, and then we'll consider if the high watermark can be updated (`maybeUpdateHighWatermark`). When updating the high watermark, we only look at the `voterStates` map, which means we won't count the leader's offset as part of the HW calculation. This _does_ mean it's possible for the HW to drop though. Here's a scenario: {code:java} # Before node 1 removal, voterStates contains Nodes 1, 2, 3 Node 1: Leader, LEO 100 Node 2: Follower, LEO 90 <- HW Node 3: Follower, LEO 85 # Leader processes removeNode record, voterStates contains Nodes 2, 3 Node 1: Leader, LEO 101 Node 2: Follower, LEO 90 Node 3: Follower, LEO 85 <- new HW{code} We want to make sure the HW does not decrement in this scenario. Perhaps we could revise `maybeUpdateHighWatermark` to continue to factor in the Leader's offset into the HW calculation, regardless of if it is in the voter set or not. e.g. {code:java} private boolean maybeUpdateHighWatermark() { // Find the largest offset which is replicated to a majority of replicas (the leader counts) - List followersByDescendingFetchOffset = followersByDescendingFetchOffset(); + List followersAndLeaderByDescFetchOffset = followersAndLeadersByDescFetchOffset(); - int indexOfHw = voterStates.size() / 2; + int indexOfHw = followersByDescendingFetchOffset.size() / 2; Optional highWatermarkUpdateOpt = followersByDescendingFetchOffset.get(indexOfHw).endOffset;{code} However, this does not cover the case when a follower is being removed from the voter set. {code:java} # Before node 2 removal, voterStates contains Nodes 1, 2, 3 Node 1: Leader, LEO 100 Node 2: Follower, LEO 90 <- HW Node 3: Follower, LEO 85 # Leader processes removeNode record, voterStates contains Nodes 1, 3 Node 1: Leader, LEO 101 Node 2: Follower, LEO 90 Node 3: Follower, LEO 85 <- new HW{code} > Fix high-watermark calculation to not assume the leader is in the voter set > --- > > Key: KAFKA-16530 > URL: https://issues.apache.org/jira/browse/KAFKA-16530 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: José Armando García Sancio >Assignee: Alyssa Huang >Priority: Major > Fix For: 3.8.0 > > > When the leader is being removed from the voter set, the leader may not be in > the voter set. This means that kraft should not assume that the leader is > part of the high-watermark calculation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1602305075 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersionUtils.FeatureVersionImpl { + +TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()), +TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()), +TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()); + +private short featureLevel; +private MetadataVersion metadataVersionMapping; +private Map dependencies; Review Comment: Should we make those instance vals final? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -156,16 +200,27 @@ object StorageTool extends Logging { def getMetadataVersion( namespace: Namespace, +featureNamesAndLevelsMap: Map[String, java.lang.Short], defaultVersionString: Option[String] ): MetadataVersion = { val defaultValue = defaultVersionString match { case Some(versionString) => MetadataVersion.fromVersionString(versionString) case None => MetadataVersion.LATEST_PRODUCTION } -Option(namespace.getString("release_version")) - .map(ver => MetadataVersion.fromVersionString(ver)) - .getOrElse(defaultValue) +val releaseVersionTag = Option(namespace.getString("release_version")) +val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME) + +(releaseVersionTag, featureTag) match { + case (Some(_), Some(_)) => +throw new IllegalArgumentException("Both --release_version and --feature were set. Only one of the two flags can be set.") Review Comment: We should disallow the case where both --release_version and --feature are used, but --feature doesn't include metadata, right? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -141,6 +189,9 @@ object StorageTool extends Logging { formatParser.addArgument("--release-version", "-r"). action(store()). help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") +formatParser.addArgument("--feature"). + help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). Review Comment: Should we add a shorthand for --feature like other options? ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -109,6 +105,51 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { +if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") +} +if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { +System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { +throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } +} + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], +metadataVersion: MetadataVersion, +specifiedFeatures: Map[String, java.lang.Short], +allFeatures: List[Features]): Unit = { +// If we are using --version-default, the default is based on the metadata version. Review Comment: Hmm, does the storage tool support the --version-default option? ## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. S
Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]
ijuma commented on code in PR #16051: URL: https://github.com/apache/kafka/pull/16051#discussion_r1612302312 ## Jenkinsfile: ## @@ -36,6 +36,12 @@ def doTest(env, target = "test") { junit '**/build/test-results/**/TEST-*.xml' } +def runTestOnDevBranch(env) { + if (!isChangeRequest(env)) { +doTest(env) + } +} Review Comment: If they don't run on PRs, I don't think anyone will validate the results for the branch builders. We should probably remove Java 11 and 17 altogether now that we have ported most tests to run with Java 16 and higher (previously many were excluded). I had suggested this in an internal conversation, so good to see similar action here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #16057: URL: https://github.com/apache/kafka/pull/16057#discussion_r1612296899 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -4424,14 +4441,113 @@ private ConsumerGroupMember validateConsumerGroupMember( * @param contextThe request context. * @param requestThe actual LeaveGroup request. * + * @return The LeaveGroup response and the records to append. + */ +public CoordinatorResult classicGroupLeave( +RequestContext context, +LeaveGroupRequestData request +) throws UnknownMemberIdException, GroupIdNotFoundException { +Group group = groups.get(request.groupId(), Long.MAX_VALUE); + +if (group == null) { +throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId())); +} + +if (group.type() == CLASSIC) { +return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request); +} else { +return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request); +} +} + +/** + * Handle a classic LeaveGroupRequest to a ConsumerGroup. + * + * @param group The ConsumerGroup. + * @param contextThe request context. + * @param requestThe actual LeaveGroup request. + * + * @return The LeaveGroup response and the records to append. + */ +private CoordinatorResult classicGroupLeaveToConsumerGroup( +ConsumerGroup group, +RequestContext context, +LeaveGroupRequestData request +) throws UnknownMemberIdException, GroupIdNotFoundException { +List memberResponses = new ArrayList<>(); +List records = new ArrayList<>(); +boolean hasValidLeaveGroupMember = false; + +for (MemberIdentity memberIdentity: request.members()) { +String memberId = memberIdentity.memberId(); +String instanceId = memberIdentity.groupInstanceId(); +String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided"; + +ConsumerGroupMember member; +try { +if (instanceId == null) { +member = group.getOrMaybeCreateMember(memberId, false); +throwIfMemberDoesNotUseClassicProtocol(member); + +log.info("[Group {}] Static Member {} has left group " + +"through explicit `LeaveGroup` request; client reason: {}", +group.groupId(), memberId, reason); +} else { +member = group.staticMember(instanceId); +throwIfStaticMemberIsUnknown(member, memberId); +// The LeaveGroup API allows administrative removal of members by GroupInstanceId +// in which case we expect the MemberId to be undefined. +if (!UNKNOWN_MEMBER_ID.equals(memberId)) { +throwIfInstanceIdIsFenced(member, group.groupId(), memberId, instanceId); +} +throwIfMemberDoesNotUseClassicProtocol(member); + +log.info("[Group {}] Static Member {} with instance id {} has left group " + +"through explicit `LeaveGroup` request; client reason: {}", +group.groupId(), memberId, instanceId, reason); +} + + records.addAll(removeMemberAndMaybeUpdateSubscriptionMetadata(group, member)); +cancelTimers(group.groupId(), member.memberId()); Review Comment: This will not be reverted if the replay/persistence fails. Is this something we want to address with the snapshottable timer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16516) Fix the controller node provider for broker to control channel
[ https://issues.apache.org/jira/browse/KAFKA-16516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-16516: Assignee: Colin McCabe (was: José Armando García Sancio) > Fix the controller node provider for broker to control channel > -- > > Key: KAFKA-16516 > URL: https://issues.apache.org/jira/browse/KAFKA-16516 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: José Armando García Sancio >Assignee: Colin McCabe >Priority: Major > Fix For: 3.8.0 > > > The broker to controller channel gets the set of voters directly from the > static configuration. This needs to change so that the leader nodes comes > from the kraft client/manager. > The code is in KafkaServer where it construct the RaftControllerNodeProvider. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit [kafka]
jeffkbkim opened a new pull request, #16059: URL: https://github.com/apache/kafka/pull/16059 Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit. Added unit test that fails without this change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 opened a new pull request, #16057: URL: https://github.com/apache/kafka/pull/16057 This patch implements the LeaveGroup api to the consumer groups that are in the mixed mode. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16832) LeaveGroup API for upgrading ConsumerGroup
Dongnuo Lyu created KAFKA-16832: --- Summary: LeaveGroup API for upgrading ConsumerGroup Key: KAFKA-16832 URL: https://issues.apache.org/jira/browse/KAFKA-16832 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16832) LeaveGroup API for upgrading ConsumerGroup
[ https://issues.apache.org/jira/browse/KAFKA-16832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu reassigned KAFKA-16832: --- Assignee: Dongnuo Lyu > LeaveGroup API for upgrading ConsumerGroup > -- > > Key: KAFKA-16832 > URL: https://issues.apache.org/jira/browse/KAFKA-16832 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Update dependencies [kafka]
highbolder closed pull request #16039: Update dependencies URL: https://github.com/apache/kafka/pull/16039 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15045: Fix RackAwareTaskAssignorTest [kafka]
apourchet opened a new pull request, #16056: URL: https://github.com/apache/kafka/pull/16056 Some unnecessary changes to RackAwareTaskAssignor broke the tests, this PR reverts those changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
gharris1727 merged PR #15469: URL: https://github.com/apache/kafka/pull/15469 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]
gharris1727 commented on PR #15469: URL: https://github.com/apache/kafka/pull/15469#issuecomment-2127953424 Here's the final performance changes: Benchmark | Before | Before Error | After | After Error | Speedup -- | -- | -- | -- | -- | -- ValuesBenchmark.testConvertToBoolean | 123.749 | 0.842 | 72.577 | 0.412 | 1.7 ValuesBenchmark.testConvertToByte | 117.883 | 1.397 | 62.387 | 0.148 | 1.9 ValuesBenchmark.testConvertToDate | 3700.318 | 160.043 | 3522.214 | 36.075 | 1.1 ValuesBenchmark.testConvertToDecimal | 1530.936 | 49.503 | 1485.654 | 11.766 | 1.0 ValuesBenchmark.testConvertToDouble | 163.937 | 55.591 | 60.378 | 0.577 | 2.7 ValuesBenchmark.testConvertToFloat | 204.591 | 109.567 | 57.611 | 0.49 | 3.6 ValuesBenchmark.testConvertToInteger | 140.586 | 3.809 | 66.496 | 0.371 | 2.1 ValuesBenchmark.testConvertToList | 1276.364 | 54.037 | 1601.568 | 28.972 | 0.8 ValuesBenchmark.testConvertToLong | 132.029 | 3.118 | 76.744 | 1.112 | 1.7 ValuesBenchmark.testConvertToMap | 1361.082 | 78.59 | 1244.339 | 11.019 | 1.1 ValuesBenchmark.testConvertToShort | 121.575 | 4.6 | 63.167 | 0.311 | 1.9 ValuesBenchmark.testConvertToString | 1667.243 | 51.186 | 1580.031 | 11.391 | 1.1 ValuesBenchmark.testConvertToStruct | 3.819 | 0.082 | 1.395 | 0.009 | 2.7 ValuesBenchmark.testConvertToTime | 2864.609 | 163.586 | 2701.721 | 60.677 | 1.1 ValuesBenchmark.testConvertToTimestamp | 2789.008 | 30.371 | 2738.573 | 19.6 | 1.0 ValuesBenchmark.testInferSchema | 123.196 | 3.292 | 99.336 | 0.867 | 1.2 ValuesBenchmark.testParseString | 43826.599 | 922.077 | 13429.742 | 133.089 | 3.3 There's a consistent performance degradation for testConvertToList that seems to come from the parseString implementation being slightly less efficient for array inputs. I'll follow up on that separately since this PR already has too much scope, and the degradation is only slight. One interesting final observation is that the variance/error for all of the tests is lower than the previous implementation. I suspect that this is because many of methods now avoid traversing the switch-case in the convertTo implementation, which lessens the number of branches and increases the branch predictor's success rate. And here's the final coverage changes: State | Class | Method | Line -- | -- | -- | -- Before | 100% (4/4) | 81% (40/49) | 78% (464/589) After | 100% (6/6) | 93% (77/82) | 84% (565/669) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" [kafka]
ableegoldman closed pull request #16055: Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" URL: https://github.com/apache/kafka/pull/16055 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" [kafka]
ableegoldman commented on PR #16055: URL: https://github.com/apache/kafka/pull/16055#issuecomment-2127922285 It's actually https://github.com/apache/kafka/pull/15972 that broke this test, closing this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]
ableegoldman commented on PR #15972: URL: https://github.com/apache/kafka/pull/15972#issuecomment-2127918475 Yep, just noticed this. Sorry about that. We're taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance [kafka]
dajac commented on PR #15883: URL: https://github.com/apache/kafka/pull/15883#issuecomment-2127912971 We will use a different approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance [kafka]
dajac closed pull request #15883: MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance URL: https://github.com/apache/kafka/pull/15883 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
dajac commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-2127911871 We will start this work from scratch. Closing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]
dajac closed pull request #14327: KAFKA-14517: Implement regex subscriptions URL: https://github.com/apache/kafka/pull/14327 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]
dajac commented on PR #15286: URL: https://github.com/apache/kafka/pull/15286#issuecomment-2127911151 This does not seem necessary any more. Closing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]
dajac closed pull request #15286: MINOR: KIP-848 Uniform Assignor Bugs URL: https://github.com/apache/kafka/pull/15286 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" [kafka]
ableegoldman opened a new pull request, #16055: URL: https://github.com/apache/kafka/pull/16055 This reverts commit 93238ae312ef7bea79160e59bb7a06623cc94a1b. [This PR](https://github.com/apache/kafka/pull/16034) broke the RackAwareAssignorTest, so I am reverting it while we figure out the cause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]
ableegoldman commented on PR #16034: URL: https://github.com/apache/kafka/pull/16034#issuecomment-2127905750 Ah shoot, I was looking at the wrong test results, this is actually breaking RackAwareTaskAssignorTest. I'll revert this and we can fix the failing test in the resubmitted PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]
ableegoldman merged PR #16034: URL: https://github.com/apache/kafka/pull/16034 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16831) CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit
Jeff Kim created KAFKA-16831: Summary: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit Key: KAFKA-16831 URL: https://issues.apache.org/jira/browse/KAFKA-16831 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim Assignee: Jeff Kim Otherwise, we default to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16822: Abstract consumer group to share functionality with share group [kafka]
apoorvmittal10 commented on PR #16054: URL: https://github.com/apache/kafka/pull/16054#issuecomment-2127858693 @dajac @omkreddy @AndrewJSchofield Please review, we can merge post 3.8 branch cut. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16822: Abstract consumer group to share functionality with share group [kafka]
apoorvmittal10 opened a new pull request, #16054: URL: https://github.com/apache/kafka/pull/16054 Abstracted code for 2 classes `ConsumerGroup` and `ConsumerGroupMember` to `AbstractGroup` and `GroupMember` respectively. The new abstract classes are created to share common functionality with `ShareGroup` and `ShareGroupMember` which are being introduced with KIP-932. The PR is majorly code refactoring from existing classes to abstract classes. Also created a new package called `common` where `MemberState` class is moved, in upcoming PRs will move common classes for `Share` and `Consumer` Group in `common` package itself. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225 ## core/src/main/scala/kafka/server/DelayedFetch.scala: ## @@ -103,7 +103,7 @@ class DelayedFetch( // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + } else if (fetchOffset.onSameSegment(endOffset) && fetchOffset.messageOffset < endOffset.messageOffset) { Review Comment: Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is the truncated leader case. In that case, we should always call `forceComplete()` immediately, right? The current code only calls `forceComplete()` when the offset metadata is available, but it's more consistent to do that regardless of the availability of the offset metadata. Should we do sth like the following? ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { if (fetchOffset.onOlderSegment(endOffset)) { // Case F, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $this immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() } else if (fetchOffset.onSameSegment(endOffset)) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) accumulatedSize += bytesAvailable } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante commented on code in PR #16001: URL: https://github.com/apache/kafka/pull/16001#discussion_r1612176138 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -1039,21 +1039,42 @@ public static List> reverseTransform(String connName, return result; } -public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List> taskProps) { +public boolean taskConfigsChanged( +ClusterConfigState configState, +String connName, +List> taskProps, +int connectorConfigHash +) { int currentNumTasks = configState.taskCount(connName); boolean result = false; if (taskProps.size() != currentNumTasks) { log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); result = true; } else { -for (int index = 0; index < currentNumTasks; index++) { +for (int index = 0; index < currentNumTasks && !result; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); result = true; } } +// Do a final check to see if runtime-controlled properties that affect tasks but may +// not be included in the connector-generated configs for them (such as converter overrides) +// have changed +if (!result) { +Integer storedConnectorConfigHash = configState.taskConfigHash(connName); +if (storedConnectorConfigHash == null) { +log.debug("Connector {} has no config hash stored for its existing tasks", connName); Review Comment: I don't love the idea of warn logs here. They'll be emitted unconditionally during upgrade even on completely unaffected clusters, and if users follow instructions to reconfigure their connectors, it'll have the same disruptive effect of forcing task restarts and extra churn. Honestly though, I suspect what's more likely is that people will ignore them. I think the underlying problem we're running into is that tightly coupling the storage of config hashes with the storage of task configs is forcing us into an uncomfortable spot where we can't publish a new config hash without forcing task restarts. I've published a new PR with a different, hopefully simpler approach [here](https://github.com/apache/kafka/pull/16053). Let me know what you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]
C0urante opened a new pull request, #16053: URL: https://github.com/apache/kafka/pull/16053 [Jira](https://issues.apache.org/jira/browse/KAFKA-9228) This uses a much simpler approach than the one pursued in https://github.com/apache/kafka/pull/16001. Once task configs are detected for a connector, the most-recent config for that connector is tracked by the herder as the "applied" config. When new task configs are generated by the connector, the latest config for the connector is compared to the "applied" connector config. If a difference is detected, or there is no "applied" config, then tasks are automatically published to the config topic. If this approach is agreeable, I can apply it to standalone mode as well, fix the failing unit test cases in `DistributedHerderTest, and possibly flesh out some new unit tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16795: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter [kafka]
chia7712 commented on PR #16020: URL: https://github.com/apache/kafka/pull/16020#issuecomment-2127829943 @brandboat Thanks for your contribution. Those supports should be removed from 4.0.0. I have filed ticket (https://issues.apache.org/jira/browse/KAFKA-16830) for you. Please feel free to reassign yourself if you have no free cycles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16830) Remove the scala version formatters support
Chia-Ping Tsai created KAFKA-16830: -- Summary: Remove the scala version formatters support Key: KAFKA-16830 URL: https://issues.apache.org/jira/browse/KAFKA-16830 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Kuan Po Tseng Fix For: 4.0.0 [https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java#L72] Those deprecated formatters "strings" should be removed from 4.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16795) Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-16795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16795. Resolution: Fixed > Fix broken compatibility in kafka.tools.NoOpMessageFormatter, > kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter > -- > > Key: KAFKA-16795 > URL: https://issues.apache.org/jira/browse/KAFKA-16795 > Project: Kafka > Issue Type: Sub-task >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.8.0 > > > [{{0bf830f}}|https://github.com/apache/kafka/commit/0bf830fc9c3915bc99b6e487e6083dabd593c5d3] > moved NoOpMessageFormatter, DefaultMessageFormatter and > LoggingMessageFormatter package from {{kafka.tools}} to > {{{}org.apache.kafka.tools.consumer{}}}{{{}{}}} > These classes could be used via cmd kafka-console-consumer.sh. We should have > a dependency cycle before 3.8.0 comes out. > > {code:java} > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ > --topic streams-wordcount-output \ > --from-beginning \ > --formatter kafka.tools.DefaultMessageFormatter \ > --property print.key=true \ > --property print.value=true \ > --property > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ > --property > value.deserializer=org.apache.kafka.common.serialization.LongDeserializer{code} > The goal in this Jira is to allow user to keep using > {{{}kafka.tools.NoOpMessageFormatter{}}}, > {{{}kafka.tools.DefaultMessageFormatter{}}}, and > {{{}kafka.tools.LoggingMessageFormatter{}}}, but we also display warning > messages to say those "strings" will be removed in 4.0. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16795: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter [kafka]
chia7712 merged PR #16020: URL: https://github.com/apache/kafka/pull/16020 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disable JDK 11 and 17 builds on PRs [kafka]
mumrah commented on PR #16051: URL: https://github.com/apache/kafka/pull/16051#issuecomment-2127820090 I think it would be worth running the compile steps for these older versions (i.e., "doValidation"). As far as I know, those steps only fail if there's actually a problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org