Re: [PR] [KAFKA-15749] Adding support for Kraft in test testClusterIdPresent [kafka]
github-actions[bot] commented on PR #15181: URL: https://github.com/apache/kafka/pull/15181#issuecomment-2065686746 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15712: Added KRaft support to MultipleListenersWithSameSecurityProtocolBaseTest [kafka]
github-actions[bot] commented on PR #15223: URL: https://github.com/apache/kafka/pull/15223#issuecomment-2065686726 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15727: Added KRaft support in AlterUserScramCredentialsRequestNotAuthorizedTest [kafka]
github-actions[bot] commented on PR #15224: URL: https://github.com/apache/kafka/pull/15224#issuecomment-2065686700 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065616140 > > @FrankYang0529 Could you reduce the partition number of offsets topic? It seems the timeout is caused by that coordinator is waiting for the offset partition, and our CI could be too busy to complete the assignments. > > Hi @chia7712, thanks for the suggestion. I have set `offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works fine. Hi @chia7712, setting `offsets.topic.num.partitions` as `1` works! Do you think that we should revert `unstable.api.versions.enable` change and try again? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: disable internal result emit throttling in TTD [kafka]
mjsax merged PR #15660: URL: https://github.com/apache/kafka/pull/15660 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
Matthias J. Sax created KAFKA-16586: --- Summary: Test TaskAssignorConvergenceTest failing Key: KAFKA-16586 URL: https://issues.apache.org/jira/browse/KAFKA-16586 Project: Kafka Issue Type: Test Components: streams, unit tests Reporter: Matthias J. Sax {code:java} java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(-538095696758490522)`.at org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) at org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} This might expose an actual bug (or incorrect test setup) and should be reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838829#comment-17838829 ] Matthias J. Sax commented on KAFKA-16514: - Thanks for the input. I was not reviewing/voting the original KIP nor the PR. Thus, I did just assume there was some mentioning about static groups... As there is nothing about it in the KIP as you pointed out, I did some digging and the PR reveals why it's only implemented for static members: [https://github.com/apache/kafka/pull/12035#discussion_r858263213] We use admin client "removeMembersFromConsumerGroup" which only works for static member, as it take the consumers `group.instance.id` as input. It seems it was a pragmatic approach... Re-reading the KIP discussion it seems that making it work for regular members would require a change in the consumer API, and thus would have been a larger scope KIP (and the idea was to keep the KIP scope limited). Thus, while we might not need a KIP for Kafka Streams, we would need one for the consumer to allow KS to use this newly added API... In the mean time, we could still to a small PR to update the JavaDocs to call out the current limitation (what will not make it a public contract IMHO, so after we get a consumer KIP we can still address this limitation w/o another KIP). Thoughts? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16573) Streams does not specify where a Serde is needed
[ https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16573: Priority: Minor (was: Major) > Streams does not specify where a Serde is needed > > > Key: KAFKA-16573 > URL: https://issues.apache.org/jira/browse/KAFKA-16573 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Priority: Minor > > Example topology: > {code:java} > builder >.table("input", Consumed.`with`(Serdes.String(), Serdes.String())) >.groupBy((key, value) => new KeyValue(value, key)) >.count() >.toStream() >.to("output", Produced.`with`(Serdes.String(), Serdes.Long())) > {code} > At runtime, we get the following exception > {code:java} > Please specify a key serde or set one through > StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > org.apache.kafka.common.config.ConfigException: Please specify a key serde or > set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > at > org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) > at > org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} > The error does not give information about the line or the processor causing > the issue. > Here a Grouped was missing inside the groupBy, but because the groupBy api > doesn't force to define Grouped, this one can be missed, and it could be > difficult to spot on a more complex topology. > Also, for someone who needs control over serdes in the topology and doesn't > want to define default serdes. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed
[ https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838827#comment-17838827 ] Matthias J. Sax commented on KAFKA-16573: - Thanks for filing this ticket. I think your idea is good; it's for sure an improvement over the current state. > Streams does not specify where a Serde is needed > > > Key: KAFKA-16573 > URL: https://issues.apache.org/jira/browse/KAFKA-16573 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Priority: Major > > Example topology: > {code:java} > builder >.table("input", Consumed.`with`(Serdes.String(), Serdes.String())) >.groupBy((key, value) => new KeyValue(value, key)) >.count() >.toStream() >.to("output", Produced.`with`(Serdes.String(), Serdes.Long())) > {code} > At runtime, we get the following exception > {code:java} > Please specify a key serde or set one through > StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > org.apache.kafka.common.config.ConfigException: Please specify a key serde or > set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > at > org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) > at > org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} > The error does not give information about the line or the processor causing > the issue. > Here a Grouped was missing inside the groupBy, but because the groupBy api > doesn't force to define Grouped, this one can be missed, and it could be > difficult to spot on a more complex topology. > Also, for someone who needs control over serdes in the topology and doesn't > want to define default serdes. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability
[ https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16280. - Resolution: Done > Expose method to determine Metric Measurability > --- > > Key: KAFKA-16280 > URL: https://issues.apache.org/jira/browse/KAFKA-16280 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 3.8.0 >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Labels: kip > Fix For: 3.8.0 > > > The Jira is to track the development of KIP-1019: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16280) Expose method to determine Metric Measurability
[ https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16280: Issue Type: Improvement (was: Bug) > Expose method to determine Metric Measurability > --- > > Key: KAFKA-16280 > URL: https://issues.apache.org/jira/browse/KAFKA-16280 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 3.8.0 >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Labels: kip > Fix For: 3.8.0 > > > The Jira is to track the development of KIP-1019: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]
mjsax merged PR #15681: URL: https://github.com/apache/kafka/pull/15681 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1571520271 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java: ## @@ -198,6 +204,62 @@ private static void assertApiMessageAndVersionEquals( } } } +} else if (actual.message() instanceof GroupMetadataValue) { Review Comment: Yeah it's more convenient. Let me change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
phooq commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2065444919 Hey @lianetm , @kirktrue Thanks for the feedback and suggestions! I like @kirktrue 's advice on having a `toString()` override at the base class while each subclass customized the `toStringBase()` , so I made the change accordingly. This seems much more cleaner and less confusing. Please take a look and let me know your thoughts. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1571469219 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -777,11 +778,78 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online downgrade if a consumer member is fenced from the consumer group. + * + * @param consumerGroup The ConsumerGroup. + * @param memberId The fenced member id. + * @return A boolean indicating whether it's valid to online downgrade the consumer group. + */ +private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { Review Comment: For explicitly leaving group we know the remaining one is using the consumer group protocol. I'm not sure about the timeout cases. It could be an expired classic protocol member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on PR #15685: URL: https://github.com/apache/kafka/pull/15685#issuecomment-2065435357 I may need to figure out how to deal with this...when setting the version to 0. ``` Replayed a FeatureLevelRecord removing feature test.feature.version ``` ``` Broker 1 registered with feature test.feature.version that is unknown to the controller (org.apache.kafka.controller.ClusterControlManager) [2024-04-18 15:32:25,440] INFO [QuorumController id=1] Replayed RegisterBrokerRecord modifying the registration for broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, incarnationId=pihYtx_qSSKlJ2K31eGFOw, brokerEpoch=8, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=19), BrokerFeature(name='test.feature.version', minSupportedVersion=0, maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, logDirs=[T1fHZ5DjOC1Dk5CviOHPbg]) (org.apache.kafka.controller.ClusterControlManager) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1571435096 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + +updatedMembers.put("newMember", new AssignmentMemberSpec( +Optional.empty(), +rackId, +topicMetadata.keySet(), +Collections.emptyMap() +
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
kirktrue commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2065387420 We've been encouraged to follow the pattern of having the super class implement `toString()` and add a `toStringBase()`, like this: ```java public class Parent { private final String foo = "Hello, world!"; protected String toStringBase() { return "foo=" + foo; } @Override public String toString() { return getClass().getSimpleName() + "{" + toStringBase() + "}"; } } ``` Then the subclasses would implement `toStringBase()` to add their values: ```java public class Child extends Parent { private final String bar = "Many"; private final String baz = "Few"; @Override protected String toStringBase() { return super.toStringBase() + ", bar=" + bar + ", baz=" + bad; } } ``` The rationale I was given was: 1. It allows the parent to keep its state private, but still "expose" it via `toString()` 2. It helps to ensure the correct class name appears in the output 3. It keeps the output uniform and reminds the developer to keep the parent state I'm not crazy about the design idiom, but that's what we were asked to use. It works OK as long as it's applied uniformly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan opened a new pull request, #15755: URL: https://github.com/apache/kafka/pull/15755 KIP-890 Part 1 introduced verification of transactions with the transaction coordinator on the `Produce` and `TxnOffsetCommit` paths. This introduced the possibility of new errors when responding to those requests. For backwards compatibility with older clients, a choice was made to convert some of the new retriable errors to existing errors that are expected and retried correctly by older clients. `NETWORK_EXCEPTION` was forgotten about and not converted, but can occur if, for example, the transaction coordinator is temporarily refusing connections. Now, we convert it to: * `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other retriable errors that can arise from transaction verification. * `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This error does not force coordinator lookup on clients, unlike `COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890, which says that retriable errors should be converted to `COORDINATOR_NOT_AVAILABLE`. Conflicts: core/src/main/scala/kafka/server/ReplicaManager.scala group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java There were some conflicts in the how the code path changed. We have three paths. 1. Produce -- In appendEntries we have the callback just for produce requests. I've included the error and the comment there. 2. Old Group Coordinator -- In GroupMetadataManager, we handle the conversion in `maybeConvertOffsetCommitError` This path is separate from the produce path. 3. New Group Coordinator -- Not supported in 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838796#comment-17838796 ] Justine Olshan commented on KAFKA-16570: Hmmm – retries here are a bit strange since the command was successful, we will just return this error until we can allocate the new producer ID. I suppose this "guarantees" the markers were written, but it is not consistent with how end txn usually works. See how the error is return on success of the EndTxn api call? [https://github.com/apache/kafka/blob/53ff1a5a589eb0e30a302724fcf5d0c72c823682/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L175] > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
philipnee commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1571359996 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) { * is a request in-flight. */ public boolean requestInFlight() { -return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; +return requestInFlight; Review Comment: sorry - I meant to say `hasInflightRequest`. Either way is fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1571348485 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java: ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +COOPERATIVE_STICKY(new CooperativeStickyAssignor()); + +private final ConsumerPartitionAssignor assignor; + +AssignorType(ConsumerPartitionAssignor assignor) { +this.assignor = assignor; +} + +public ConsumerPartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "COOPERATIVE_STICKY"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private Map subscriptions = new HashMap<>(); + +private ConsumerPartitionAssignor.GroupSubscription groupSubscription; + +private static final int NUMBER_OF_RACKS = 3; + +private ConsumerPartitionAssignor assignor; + +private Cluster metadata; + +private final List allTopicNames = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +// Ensure there are enough racks and brokers for the replication factor. +if (NUMBER_OF_RACKS < 2) { +throw new IllegalArgumentException("Number of broker racks must be at least equal to 2."); +} + +populateTopicMetadata(); + +addMemberSubscriptions(); + +assignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) simulateRebalance(); +} + +private void populateTopicMetadata() { +List partitions = new ArrayList<>(); +int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; + +// Create nodes
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1571339288 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
lucasbru commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1571335784 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) { * is a request in-flight. */ public boolean requestInFlight() { -return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; +return requestInFlight; Review Comment: `inflightRequested` seems to change the meaning. Who requested an inflight? In my head it's clearly the noun "request", but if you are bothered, may I suggest "isRequestInFlight"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1571327160 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; Review Comment: Okay, I'll add that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838786#comment-17838786 ] Artem Livshits commented on KAFKA-16570: > We should just swallow this error and treat this as a successful run of the > command. I think we should instead implement retries and fail if we cannot succeed within the command timeout. > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated
[ https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16386: --- Affects Version/s: 3.7.0 > NETWORK_EXCEPTIONs from transaction verification are not translated > --- > > Key: KAFKA-16386 > URL: https://issues.apache.org/jira/browse/KAFKA-16386 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0, 3.7.0 >Reporter: Sean Quah >Priority: Minor > Fix For: 3.8.0 > > > KAFKA-14402 > ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]) > adds verification with the transaction coordinator on Produce and > TxnOffsetCommit paths as a defense against hanging transactions. For > compatibility with older clients, retriable errors from the verification step > are translated to ones already expected and handled by existing clients. When > verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s. > [~dajac] noticed this manifesting as a test failure when > tests/kafkatest/tests/core/transactions_test.py was run with an older client > (prior to the fix for KAFKA-16122): > {quote} > {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The > {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error > so it transitions to the fatal state. > It seems that there are two cases where the server could return it: (1) When > the verification request times out or its connections is cut; or (2) in > {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because > we want a retriable error. > {quote} > The first case was triggered as part of the test. The second case happens > when there is already a verification request ({{AddPartitionsToTxn}}) in > flight with the same epoch and we want clients to try again when we're not > busy. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated
[ https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838768#comment-17838768 ] Justine Olshan commented on KAFKA-16386: Note: For 3.6, this is only returned in the produce path which treats the exception as retriable. The TxnOffsetCommit path is the one that experiences the fatal error and is only in 3.7. > NETWORK_EXCEPTIONs from transaction verification are not translated > --- > > Key: KAFKA-16386 > URL: https://issues.apache.org/jira/browse/KAFKA-16386 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0, 3.7.0 >Reporter: Sean Quah >Priority: Minor > Fix For: 3.8.0 > > > KAFKA-14402 > ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]) > adds verification with the transaction coordinator on Produce and > TxnOffsetCommit paths as a defense against hanging transactions. For > compatibility with older clients, retriable errors from the verification step > are translated to ones already expected and handled by existing clients. When > verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s. > [~dajac] noticed this manifesting as a test failure when > tests/kafkatest/tests/core/transactions_test.py was run with an older client > (prior to the fix for KAFKA-16122): > {quote} > {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The > {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error > so it transitions to the fatal state. > It seems that there are two cases where the server could return it: (1) When > the verification request times out or its connections is cut; or (2) in > {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because > we want a retriable error. > {quote} > The first case was triggered as part of the test. The second case happens > when there is already a verification request ({{AddPartitionsToTxn}}) in > flight with the same epoch and we want clients to try again when we're not > busy. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> null); + +// The heartbeat thread is running and keeps sending heartbeat requests. +TestUtils.waitForCondition(() -> { +// Rebalance timeout elapses while poll is never invoked causing a poll timeout expiry +coordinator.sendHeartbeatRequest(); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +time.sleep(1); Review Comment: This sleep value, though small, was the one that eventually worked after running the test continuously for 30 times. Sleeping for 10s still kept failing because sometimes the coordinator's session timeout would happen even though it doesn't look like it should. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp commented on PR #15722: URL: https://github.com/apache/kafka/pull/15722#issuecomment-2064749720 Is it possible to add unit test? > I'm not sure whether it's needed here. Effectively, such test would verify the behavior of deprecated method. What do you think? We should verify that docs should not include deprecated > The docs are correct: [io-wait-ratio](https://github.com/emitskevich-blp/kafka/blob/53ff1a5a589eb0e30a302724fcf5d0c72c823682/docs/ops.html#L2372), [io-ratio](https://github.com/emitskevich-blp/kafka/blob/53ff1a5a589eb0e30a302724fcf5d0c72c823682/docs/ops.html#L2392) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1571147929 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + Review Comment: For the purpose of this test, I create a new coordinator with rebalance timeout lesser than session timeout . As mentioned in the comment, it mayn't happen in the real world, but it makes testing a lot easier. Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15089) Consolidate all the group coordinator configs
[ https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838754#comment-17838754 ] David Jacot commented on KAFKA-15089: - The goal was to also define the “AbstractConfig” part here like we did for the storage and raft modules. Are you interested in giving it a try? > Consolidate all the group coordinator configs > - > > Key: KAFKA-15089 > URL: https://issues.apache.org/jira/browse/KAFKA-15089 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > The group coordinator configurations are defined in KafkaConfig at the > moment. As KafkaConfig is defined in the core module, we can't pass it to the > new java modules to pass the configurations along. > A suggestion here is to centralize all the configurations of a module in the > module itself similarly to what we have do for RemoteLogManagerConfig and > RaftConfig. We also need a mechanism to add all the properties defined in the > module to the KafkaConfig's ConfigDef. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> null); + +// The heartbeat thread is running and keeps sending heartbeat requests. +TestUtils.waitForCondition(() -> { +// Rebalance timeout elapses while poll is never invoked causing a poll timeout expiry +coordinator.sendHeartbeatRequest(); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +time.sleep(1); Review Comment: This sleep value, though small was the one that eventually worked after running the test continuously for 30 times. Sleeping for 10s still kept failing because sometimes the coordinator's session timeout would happen even though it doesn't look like it should. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1571147929 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { +// We will create a new WorkerCoordinator object with a rebalance timeout smaller +// than session timeout. This might not happen in the real world but it makes testing +// easier and the test not flaky. +int smallRebalanceTimeout = 20; +this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, +smallRebalanceTimeout, +heartbeatIntervalMs, +groupId, +Optional.empty(), +retryBackoffMs, +retryBackoffMaxMs, +true); +this.coordinator = new WorkerCoordinator(rebalanceConfig, +logContext, +consumerClient, +new Metrics(time), +"consumer" + groupId, +time, +LEADER_URL, +configStorage, +rebalanceListener, +compatibility, +0); + Review Comment: For the purpose of this test, I create a new coordinator with rebalance timeout lesser than session timeout . As mentioned in the comment, it mayn't happen in the real world, but it makes testing a lot easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp commented on code in PR #15722: URL: https://github.com/apache/kafka/pull/15722#discussion_r1571063662 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, String baseName, String action) { MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, -String.format("*Deprecated* The fraction of time the I/O thread spent %s", action), metricTags); +String.format("The fraction of time the I/O thread spent %s", action), metricTags); Review Comment: Added to method javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16389. Resolution: Fixed > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1571146638 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: yeah I got that. I added heartbeat request response till the rebalance timeout happens. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1571142879 ## server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { Review Comment: > I think at some point we need to revisit some of the existing Config files as some of them breaks few rules like using PROP instead of CONFIG suffix and use DEFAULT as prefix instead of suffix and some of the classes is Config instead of Configs as well. agree! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15559: URL: https://github.com/apache/kafka/pull/15559#issuecomment-2064671000 I wonder if I should backport this to 3.7 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1571126880 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Thanks for suggesting the alternative approach. I'll check and comeback on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2064647379 Thanks @chia7712 for the review! > log-start-offset-checkpoint is missing and remote storage is enabled. The logStartOffset will be set to zero, and it seems be a potential issue since the ListOffsetRequest could get incorrect result Most of the time when the follower joins the ISR, it updates the log-start-offset and high-watermark from the leader FETCH response. The issue can happen only when the follower gets elected as leader before updating it's state as mentioned in the summary/comments. When the `log-start-offset-checkpoint` file is missing: 1. For normal topic, the log-start-offset will be set to base-offset of the first log segment so there is no issue. Since the data is there, read won't fail. 2. For remote topic, the log-start-offset will be stale for sometime until the RemoteLogManager [updates](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L671) it, so the issue is intermittent and self-recovers. > replication-offset-checkpoint is missing and remote storage is enabled. This is what your described. The HWM is pointed to middle of tiered storage and so it causes error when fetching records from local segments. This is not an issue for normal topic. But for cluster enabled with remote-storage, if the issue happens even on 1 partition, then it starts to affect *subset* of topics. Controller batches the partitions in the LeaderAndIsr request. If the broker fails to process the LISR for one partition, then the remaining partition in that batch won't be processed. The producers producing to those topics will start receiving NOT_LEADER_FOR_PARTITION error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets [kafka]
jolshan closed pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets URL: https://github.com/apache/kafka/pull/9590 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1571119979 ## server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { Review Comment: Just pushed an update. The problem right now that we have the name of the config type without any unnecessary prefixes has been preserved in other modules and am trying to avoid making this pr bigger than it should be. I think at some point we need to revisit some of the existing Config files as some of them breaks few rules like using `PROP` instead of `CONFIG` suffix and use `DEFAULT` as prefix instead of suffix and some of the classes is `Config` instead of `Configs` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1571099798 ## server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { Review Comment: I can rename it to `ServerLogConfigs` or `BrokerLogConfigs` but we already have `LogConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1571093805 ## server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { Review Comment: Do we need to add prefix "kafka" here? sorry that I try to figure out the naming rule. It seems to me the prefix "kafka" means the class is used to collect other "xxxConfigs" constants and to be the central place for kafka xxx-related configs. For example, `KafkaSecurityConfigs` collects all security-related configs from `BrokerSecurityConfigs`, `SaslConfigs`, `SslConfigs`, etc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1571079283 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -777,11 +778,78 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online downgrade if a consumer member is fenced from the consumer group. + * + * @param consumerGroup The ConsumerGroup. + * @param memberId The fenced member id. + * @return A boolean indicating whether it's valid to online downgrade the consumer group. + */ +private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", +consumerGroup.groupId()); +} +return true; +} + +public CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { +consumerGroup.createGroupTombstoneRecords(records); + +ClassicGroup classicGroup; +try { +classicGroup = ClassicGroup.fromConsumerGroup( +consumerGroup, +leavingMemberId, +logContext, +time, +metrics, +consumerGroupSessionTimeoutMs, +metadataImage +); +} catch (SchemaException e) { +log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + +"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + +throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", +consumerGroup.groupId(), e.getMessage())); +} + classicGroup.createConsumerGroupRecords(metadataImage.features().metadataVersion(), records); + +removeGroup(consumerGroup.groupId()); + +groups.put(consumerGroup.groupId(), classicGroup); +metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + +classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); +prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { Review Comment: I wonder if we could use `exceptionally` here. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -777,11 +778,78 @@ public ClassicGroup classicGroup( } } +/** + * Validates the online downgrade if a consumer member is fenced from the consumer group. + * + * @param consumerGroup The ConsumerGroup. + * @param memberId The fenced member id. + * @return A boolean indicating whether it's valid to online downgrade the consumer group. + */ +private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { +if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { +log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", +consumerGroup.groupId()); +return false; +} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { +log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() <= 1) { +log.info("Skip downgrading the consumer group {} to classic group because it's empty.", +consumerGroup.groupId()); +return false; +} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { +log.info("Cannot downgrade consumer group {} to classic group because its
[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16566: --- Labels: kip-848-client-support system-tests (was: kip-848-client-support) > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16566: --- Component/s: clients consumer system tests > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
[ https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838730#comment-17838730 ] Matthias J. Sax commented on KAFKA-16585: - Thanks for raising this ticket. Wondering how we could address it though... Given that the "contract" is that the record key is not modified, but there is no input record, how could the key be set in a meaningful way? – The only thing I can think of right now would be to set `key = null`, but it's still semantically questionable... Can you provide more details what you are trying to do? > No way to forward message from punctuation method in the FixedKeyProcessor > -- > > Key: KAFKA-16585 > URL: https://issues.apache.org/jira/browse/KAFKA-16585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Stanislav Spiridonov >Priority: Major > > The FixedKeyProcessorContext can forward only FixedKeyRecord. This class > doesn't have a public constructor and can be created based on existing > records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838728#comment-17838728 ] Matthias J. Sax commented on KAFKA-16567: - Why is this ticket marked as "blocker" for 4.0 release? > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Assignee: Walter Hernandez >Priority: Blocker > Labels: kip > Fix For: 4.0.0 > > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16567: Component/s: streams > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Assignee: Walter Hernandez >Priority: Blocker > Fix For: 4.0.0 > > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16567: Labels: kip (was: ) > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Assignee: Walter Hernandez >Priority: Blocker > Labels: kip > Fix For: 4.0.0 > > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback
[ https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838727#comment-17838727 ] Matthias J. Sax commented on KAFKA-16263: - Yes, these are the handlers this ticket refers do. > Add Kafka Streams docs about available listeners/callback > - > > Key: KAFKA-16263 > URL: https://issues.apache.org/jira/browse/KAFKA-16263 > Project: Kafka > Issue Type: Task > Components: docs, streams >Reporter: Matthias J. Sax >Assignee: Kuan Po Tseng >Priority: Minor > Labels: beginner, newbie > > Kafka Streams allows to register all kind of listeners and callback (eg, > uncaught-exception-handler, restore-listeners, etc) but those are not in the > documentation. > A good place might be > [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2064470343 > Hi @linu-shibu @showuon this still uses raw types, and so is still type-unsafe. Fixing that was my motivation for creating the ticket, sorry for not emphasizing it more. > > I think it should be addressed in this PR rather than a follow-up, because it involves changes to the exact same code. > > One other thought that occurs to me now is that we should store the metadata transform instances, so that we don't incur the cost of constructing them for each call. > > Thanks! Sure @gharris1727, I will address the above mentioned comments and update the PR. Thanks for pointing it out! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]
emitskevich-blp commented on code in PR #15722: URL: https://github.com/apache/kafka/pull/15722#discussion_r1571063662 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, String baseName, String action) { MetricName rateMetricName = metrics.metricName(baseName + "-ratio", groupName, -String.format("*Deprecated* The fraction of time the I/O thread spent %s", action), metricTags); +String.format("The fraction of time the I/O thread spent %s", action), metricTags); Review Comment: Added in method javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16336) Remove Deprecated metric standby-process-ratio
[ https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838724#comment-17838724 ] Matthias J. Sax commented on KAFKA-16336: - The next planned release is 3.8, but we can work on this ticket only for 4.0, as it's a breaking change that's only allowed for a major release. – So this ticket cannot be picked up yet. > Remove Deprecated metric standby-process-ratio > -- > > Key: KAFKA-16336 > URL: https://issues.apache.org/jira/browse/KAFKA-16336 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Walter Hernandez >Priority: Blocker > Fix For: 4.0.0 > > > Metric "standby-process-ratio" was deprecated in 3.5 release via > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2064450248 > > However I'm bit concern that LogConfig seems already huge. What others prefer? Keep it in KafkaLogConfigs or move them to LogConfig.ServerLogConfig > > the most default values of `KafkaLogConfigs` are in `LogConfig`, and they are in different module. That pattern is no similar to `ReplicationConfigs`, `KafkaSecurityConfigs`. Is server-common module more suitable to collect those server side configs since both`storage` and `server` depend on `server-common`. Also, `server-common` has `org.apache.kafka.server.config` package too. I think it might be better and simpler to move `KafkaLogConfigs` into `server-common` and make LogConfig use them. I'll push an update with this soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
mumrah merged PR #15470: URL: https://github.com/apache/kafka/pull/15470 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1571030504 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: Added a comment in [3d78e55](https://github.com/apache/kafka/pull/15136/commits/3d78e5567fd16aa77f0b9f8c510ad26f0e6443c6) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix typos [kafka]
birdoplank closed pull request #15752: Fix typos URL: https://github.com/apache/kafka/pull/15752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1571018390 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +class SingleFieldPathTest { + +@Test +void shouldIncludeEmptyFieldNames() { +assertArrayEquals( +new String[] {"", "", ""}, +new SingleFieldPath("..", FieldSyntaxVersion.V2).path() +); +assertArrayEquals( +new String[] {"foo", "", ""}, +new SingleFieldPath("foo..", FieldSyntaxVersion.V2).path() +); +assertArrayEquals( +new String[] {"", "bar", ""}, +new SingleFieldPath(".bar.", FieldSyntaxVersion.V2).path() +); +assertArrayEquals( +new String[] {"", "", "baz"}, +new SingleFieldPath("..baz", FieldSyntaxVersion.V2).path() +); +} Review Comment: This case can probably be moved to `FieldPathNotationTest`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1571012712 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) Review Comment: sry I missed that, I changed it in the next commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1571012160 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { Review Comment: > I think that's not necessarily true. findAbandonedFutureLogs may find a sourceLog if the log directory is online but I don't think it's safe to update the high watermark even then because sourceLog may be ahead of futureLog. you are right! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1571007787 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class FieldPathNotationTest { +final static String[] EMPTY_PATH = new String[] {}; + +@Test +void shouldBuildV1WithDotsAndBacktickPair() { +// Given v1 +// When path contains dots, then single step path +assertArrayEquals( +new String[] {"foo.bar.baz"}, +new SingleFieldPath("foo.bar.baz", FieldSyntaxVersion.V1).path()); +// When path contains backticks, then single step path +assertArrayEquals( +new String[] {"foo`bar`"}, +new SingleFieldPath("foo`bar`", FieldSyntaxVersion.V1).path()); +// When path contains dots and backticks, then single step path +assertArrayEquals( +new String[] {"foo.`bar.baz`"}, +new SingleFieldPath("foo.`bar.baz`", FieldSyntaxVersion.V1).path()); +} + +@Test +void shouldBuildV2WithEmptyPath() { +// Given v2 +// When path is empty +// Then build a path with no steps +assertArrayEquals(EMPTY_PATH, new SingleFieldPath("", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WithoutDots() { +// Given v2 +// When path without dots +// Then build a single step path +assertArrayEquals(new String[] {"foobarbaz"}, new SingleFieldPath("foobarbaz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WhenIncludesDots() { +// Given v2 and fields without dots +// When path includes dots +// Then build a path with steps separated by dots +assertArrayEquals(new String[] {"foo", "bar", "baz"}, new SingleFieldPath("foo.bar.baz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WithoutWrappingBackticks() { +// Given v2 and fields without dots +// When backticks are not wrapping a field name +// Then build a single step path including backticks +assertArrayEquals(new String[] {"foo`bar`baz"}, new SingleFieldPath("foo`bar`baz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2WhenIncludesDotsAndBacktickPair() { +// Given v2 and fields including dots +// When backticks are wrapping a field name (i.e. withing edges or between dots) +// Then build a path with steps separated by dots and not including backticks +assertArrayEquals(new String[] {"foo.bar.baz"}, new SingleFieldPath("`foo.bar.baz`", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "bar.baz"}, new SingleFieldPath("foo.`bar.baz`", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo.bar", "baz"}, new SingleFieldPath("`foo.bar`.baz", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "bar", "baz"}, new SingleFieldPath("foo.`bar`.baz", FieldSyntaxVersion.V2).path()); +} + +@Test +void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() { +// Given v2 and fields including dots and backticks +// When backticks are wrapping a field name (i.e. withing edges or between dots) +// Then build a path with steps separated by dots and including non-wrapping backticks +assertArrayEquals(new String[] {"foo", "`bar.baz"}, new SingleFieldPath("foo.``bar.baz`", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "bar.baz`"}, new SingleFieldPath("foo.`bar.baz``", FieldSyntaxVersion.V2).path()); +assertArrayEquals(new String[] {"foo", "ba`r.baz"},
Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
philipnee commented on PR #15753: URL: https://github.com/apache/kafka/pull/15753#issuecomment-2064308513 @lucasbru - This is just to remove the consumer protocol from testing as it is not suited for this test. Much appreciated if you get a chance to look at this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]
philipnee opened a new pull request, #15753: URL: https://github.com/apache/kafka/pull/15753 Consumer Rolling Upgrade is meant to test the protocol upgrade for the old protocol. Therefore, I am removing old changes. ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-04-18--004 run time: 1 minute 38.006 seconds tests run:3 passed: 3 flaky:0 failed: 0 ignored: 0 test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 33.562 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 34.902 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ZK.use_new_coordinator=False status: PASS run time: 29.447 seconds ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570998105 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class FieldPathNotationTest { Review Comment: We can simplify these tests with a few utility methods: ```java private void assertParseV1(String path) { assertArrayEquals( new String[] {path}, new SingleFieldPath(path, FieldSyntaxVersion.V1).path()); } private void assertParseV2(String inputPath, String... expectedSteps) { assertArrayEquals( expectedSteps, new SingleFieldPath(inputPath, FieldSyntaxVersion.V2).path() ); } private void assertParseV2Error(String inputPath, String expectedMessage) { ConfigException exception = assertThrows( ConfigException.class, () -> new SingleFieldPath(inputPath, FieldSyntaxVersion.V2) ); assertEquals(expectedMessage, exception.getMessage()); } ``` Some simplified test cases would look like this: ```java @Test void shouldBuildV1WithDotsAndBacktickPair() { // Given v1 // When path contains dots, then single step path assertParseV1("foo.bar.baz"); // When path contains backticks, then single step path assertParseV1("foo`bar`"); // When path contains dots and backticks, then single step path assertParseV1("foo.`bar.baz`"); } @Test void shouldBuildV2WhenIncludesDotsAndBacktickPair() { // Given v2 and fields including dots // When backticks are wrapping a field name (i.e. withing edges or between dots) // Then build a path with steps separated by dots and not including backticks assertParseV2("`foo.bar.baz`", "foo.bar.baz"); assertParseV2("foo.`bar.baz`", "foo", "bar.baz"); assertParseV2("`foo.bar`.baz", "foo.bar", "baz"); assertParseV2("foo.`bar`.baz", "foo", "bar", "baz"); } @Test void shouldFailV2WhenIncompleteBackticks() { // Given v2 // When backticks are not closed and not escaped // Then it should fail assertParseV2Error( "`foo.bar.baz", "Incomplete backtick pair in path: [`foo.bar.baz], consider adding a backslash before backtick at position 0 to escape it" ); assertParseV2Error( "foo.`bar.baz", "Incomplete backtick pair in path: [foo.`bar.baz], consider adding a backslash before backtick at position 4 to escape it" ); assertParseV2Error( "foo.bar.`baz", "Incomplete backtick pair in path: [foo.bar.`baz], consider adding a backslash before backtick at position 8 to escape it" ); assertParseV2Error( "foo.bar.`baz\\`", "Incomplete backtick pair in path: [foo.bar.`baz\\`], consider adding a backslash before backtick at position 8 to escape it" ); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
philipnee commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1570988398 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) { * is a request in-flight. */ public boolean requestInFlight() { -return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; +return requestInFlight; Review Comment: do you think it would make sense to rename this to: `inflightRequested`? `requestInflight` sounds like a verb/action to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
philipnee commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1570986831 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java: ## @@ -48,4 +50,40 @@ public void testRequestStateSimple() { state.reset(); assertTrue(state.canSendRequest(200)); } + +@Test +public void testTrackInflightOnSuccessfulAttempt() { +testTrackInflight(RequestState::onSuccessfulAttempt); +} + +@Test +public void testTrackInflightOnFailedAttempt() { +testTrackInflight(RequestState::onFailedAttempt); +} + +private void testTrackInflight(BiConsumer onCompletedAttempt) { +RequestState state = new RequestState( +new LogContext(), +this.getClass().getSimpleName(), +100, +2, +1000, +0); + +// This is just being paranoid... +assertFalse(state.requestInFlight()); + +// When we've sent a request, the flag should update from false to true. +state.onSendAttempt(); +assertTrue(state.requestInFlight()); + +// Now we've received the response. +onCompletedAttempt.accept(state, 236); + +// When we've sent a second request with THE SAME TIMESTAMP as the previous response, Review Comment: The requestInFlight doesn't update the timestamp internally. I wonder if we should just say "making consecutive requests" instead of mentioning the timestamp. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2064258399 Hey @cadonna, could you take a look when you have chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]
lianetm commented on PR #15750: URL: https://github.com/apache/kafka/pull/15750#issuecomment-2064223926 Hey @phooq, thanks for taking on this one. High level question about the motivation. The `toStringBase` defined in the base class `RequestState` includes vars that all states have, so that inheritors can include them in their toString. This btw, is why it's used in the RequestState toString itself. So I would expect that the usage would be that inheritors like this `OffsetFetchRequestState` would just want to override their `toString`, include the props that are specific to them, and also include the props that are common to all states using the toStringBase (exactly what being done before). What do you think? That being said, this makes me notice that actually only the `OffsetFetchRequestState` is overriding the toString, so, if we align on what we want to achieve, we could repurpose this PR/Jira and make the toString consistent in all request states that inherit from the base RequestState, so they all override the toString in a similar way to the fetch request state. Thoughts? let me know if I'm missing something about the jira itself @kirktrue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on PR #15697: URL: https://github.com/apache/kafka/pull/15697#issuecomment-2064216189 Rebased on latest trunk as there were some conflicts. Addressed some of the comments but there are 2 things I need to investigate: * `LogDirFailureTest` fails in `@AfterAll` likely because an incorrect shutdown, perhaps there's a timing issue * Check if we can detect if there are any leaders before shutdown I'll update on both shortly, hopefully tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
cadonna commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1570932239 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first Review Comment: I do not understand those comments. How do I recognize in the code that the coordinator is not looked up first? And why is that so important? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest { consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp) } + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed, even when no commit sync is performed as part of the close (due to auto-commit +// disabled, or simply because there no consumed offsets). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount) + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570962332 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map}). + * + * If the SMT requires accessing multiple fields on the same data object, use {@link MultiFieldPaths} instead. + * + * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see FieldSyntaxVersion + * @see MultiFieldPaths + */ +public class SingleFieldPath { +// Invariants: +// - A field path can contain one or more steps +private static final char BACKTICK = '`'; +private static final char DOT = '.'; +private static final char BACKSLASH = '\\'; + +private final String originalPath; +private final FieldSyntaxVersion version; +private final List steps; + +public SingleFieldPath(String pathText, FieldSyntaxVersion version) { +this.originalPath = Objects.requireNonNull(pathText, "Field path cannot be null"); +this.version = version; +switch (version) { +case V1: // backward compatibility +this.steps = Collections.singletonList(pathText); +break; +case V2: +this.steps = buildFieldPathV2(pathText); +break; +default: +throw new IllegalArgumentException("Unknown syntax version: " + version); +} +} + +private static List buildFieldPathV2(String path) { +final List steps = new ArrayList<>(); +// path character index to track backticks and dots and break path into steps +int idx = 0; +while (idx < path.length() && idx >= 0) { +if (path.charAt(idx) != BACKTICK) { +final int start = idx; +idx = path.indexOf(String.valueOf(DOT), idx); +if (idx >= 0) { // get path step and move forward +String field = path.substring(start, idx); +steps.add(field); +idx++; +} else { // add all +String field = path.substring(start); +steps.add(field); +} +} else { // has backtick +int backtickAt = idx; +idx++; +StringBuilder field = new StringBuilder(); +int start = idx; +while (true) { +// find closing backtick +idx = path.indexOf(String.valueOf(BACKTICK), idx); +if (idx == -1) { // if not found, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} + +// backtick escaped if right after backslash +boolean escaped = path.charAt(idx - 1) == BACKSLASH; + +if (idx >= path.length() - 1) { // at the end of path +if (escaped) { // but escaped, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} +field.append(path, start, idx); +// we've reached the end of the path, and the last character is the backtick +steps.add(field.toString()); +idx++; +break; +} + +if
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570959086 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { Review Comment: I think that's not necessarily true. `findAbandonedFutureLogs` may find a `sourceLog` if the log directory is online but I don't think it's safe to update the high watermark even then because sourceLog may be ahead of futureLog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570960525 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map}). + * + * If the SMT requires accessing multiple fields on the same data object, use {@link MultiFieldPaths} instead. + * + * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see FieldSyntaxVersion + * @see MultiFieldPaths + */ +public class SingleFieldPath { +// Invariants: +// - A field path can contain one or more steps +private static final char BACKTICK = '`'; +private static final char DOT = '.'; +private static final char BACKSLASH = '\\'; + +private final String originalPath; +private final FieldSyntaxVersion version; +private final List steps; + +public SingleFieldPath(String pathText, FieldSyntaxVersion version) { +this.originalPath = Objects.requireNonNull(pathText, "Field path cannot be null"); +this.version = version; +switch (version) { +case V1: // backward compatibility +this.steps = Collections.singletonList(pathText); +break; +case V2: +this.steps = buildFieldPathV2(pathText); +break; +default: +throw new IllegalArgumentException("Unknown syntax version: " + version); +} +} + +private static List buildFieldPathV2(String path) { +final List steps = new ArrayList<>(); +// path character index to track backticks and dots and break path into steps +int idx = 0; +while (idx < path.length() && idx >= 0) { +if (path.charAt(idx) != BACKTICK) { +final int start = idx; +idx = path.indexOf(String.valueOf(DOT), idx); +if (idx >= 0) { // get path step and move forward +String field = path.substring(start, idx); +steps.add(field); +idx++; +} else { // add all +String field = path.substring(start); +steps.add(field); +} +} else { // has backtick +int backtickAt = idx; +idx++; +StringBuilder field = new StringBuilder(); +int start = idx; +while (true) { +// find closing backtick +idx = path.indexOf(String.valueOf(BACKTICK), idx); +if (idx == -1) { // if not found, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} + +// backtick escaped if right after backslash +boolean escaped = path.charAt(idx - 1) == BACKSLASH; + +if (idx >= path.length() - 1) { // at the end of path +if (escaped) { // but escaped, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} +field.append(path, start, idx); +// we've reached the end of the path, and the last character is the backtick +steps.add(field.toString()); +idx++; +break; +} + +if
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map}). + * + * If the SMT requires accessing multiple fields on the same data object, use {@link MultiFieldPaths} instead. + * + * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see FieldSyntaxVersion + * @see MultiFieldPaths + */ +public class SingleFieldPath { +// Invariants: +// - A field path can contain one or more steps +private static final char BACKTICK = '`'; +private static final char DOT = '.'; +private static final char BACKSLASH = '\\'; + +private final String originalPath; +private final FieldSyntaxVersion version; +private final List steps; + +public SingleFieldPath(String pathText, FieldSyntaxVersion version) { +this.originalPath = Objects.requireNonNull(pathText, "Field path cannot be null"); +this.version = version; +switch (version) { +case V1: // backward compatibility +this.steps = Collections.singletonList(pathText); +break; +case V2: +this.steps = buildFieldPathV2(pathText); +break; +default: +throw new IllegalArgumentException("Unknown syntax version: " + version); +} +} + +private static List buildFieldPathV2(String path) { +final List steps = new ArrayList<>(); +// path character index to track backticks and dots and break path into steps +int idx = 0; +while (idx < path.length() && idx >= 0) { +if (path.charAt(idx) != BACKTICK) { +final int start = idx; +idx = path.indexOf(String.valueOf(DOT), idx); +if (idx >= 0) { // get path step and move forward +String field = path.substring(start, idx); +steps.add(field); +idx++; +} else { // add all +String field = path.substring(start); +steps.add(field); +} +} else { // has backtick +int backtickAt = idx; +idx++; +StringBuilder field = new StringBuilder(); +int start = idx; +while (true) { +// find closing backtick +idx = path.indexOf(String.valueOf(BACKTICK), idx); +if (idx == -1) { // if not found, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} + +// backtick escaped if right after backslash +boolean escaped = path.charAt(idx - 1) == BACKSLASH; + +if (idx >= path.length() - 1) { // at the end of path +if (escaped) { // but escaped, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} +field.append(path, start, idx); +// we've reached the end of the path, and the last character is the backtick +steps.add(field.toString()); +idx++; +break; +} + +if
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map}). + * + * If the SMT requires accessing multiple fields on the same data object, use {@link MultiFieldPaths} instead. + * + * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see FieldSyntaxVersion + * @see MultiFieldPaths + */ +public class SingleFieldPath { +// Invariants: +// - A field path can contain one or more steps +private static final char BACKTICK = '`'; +private static final char DOT = '.'; +private static final char BACKSLASH = '\\'; + +private final String originalPath; +private final FieldSyntaxVersion version; +private final List steps; + +public SingleFieldPath(String pathText, FieldSyntaxVersion version) { +this.originalPath = Objects.requireNonNull(pathText, "Field path cannot be null"); +this.version = version; +switch (version) { +case V1: // backward compatibility +this.steps = Collections.singletonList(pathText); +break; +case V2: +this.steps = buildFieldPathV2(pathText); +break; +default: +throw new IllegalArgumentException("Unknown syntax version: " + version); +} +} + +private static List buildFieldPathV2(String path) { +final List steps = new ArrayList<>(); +// path character index to track backticks and dots and break path into steps +int idx = 0; +while (idx < path.length() && idx >= 0) { +if (path.charAt(idx) != BACKTICK) { +final int start = idx; +idx = path.indexOf(String.valueOf(DOT), idx); +if (idx >= 0) { // get path step and move forward +String field = path.substring(start, idx); +steps.add(field); +idx++; +} else { // add all +String field = path.substring(start); +steps.add(field); +} +} else { // has backtick +int backtickAt = idx; +idx++; +StringBuilder field = new StringBuilder(); +int start = idx; +while (true) { +// find closing backtick +idx = path.indexOf(String.valueOf(BACKTICK), idx); +if (idx == -1) { // if not found, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} + +// backtick escaped if right after backslash +boolean escaped = path.charAt(idx - 1) == BACKSLASH; + +if (idx >= path.length() - 1) { // at the end of path +if (escaped) { // but escaped, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} +field.append(path, start, idx); +// we've reached the end of the path, and the last character is the backtick +steps.add(field.toString()); +idx++; +break; +} + +if
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map}). + * + * If the SMT requires accessing multiple fields on the same data object, use {@link MultiFieldPaths} instead. + * + * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see FieldSyntaxVersion + * @see MultiFieldPaths + */ +public class SingleFieldPath { +// Invariants: +// - A field path can contain one or more steps +private static final char BACKTICK = '`'; +private static final char DOT = '.'; +private static final char BACKSLASH = '\\'; + +private final String originalPath; +private final FieldSyntaxVersion version; +private final List steps; + +public SingleFieldPath(String pathText, FieldSyntaxVersion version) { +this.originalPath = Objects.requireNonNull(pathText, "Field path cannot be null"); +this.version = version; +switch (version) { +case V1: // backward compatibility +this.steps = Collections.singletonList(pathText); +break; +case V2: +this.steps = buildFieldPathV2(pathText); +break; +default: +throw new IllegalArgumentException("Unknown syntax version: " + version); +} +} + +private static List buildFieldPathV2(String path) { +final List steps = new ArrayList<>(); +// path character index to track backticks and dots and break path into steps +int idx = 0; +while (idx < path.length() && idx >= 0) { +if (path.charAt(idx) != BACKTICK) { +final int start = idx; +idx = path.indexOf(String.valueOf(DOT), idx); +if (idx >= 0) { // get path step and move forward +String field = path.substring(start, idx); +steps.add(field); +idx++; +} else { // add all +String field = path.substring(start); +steps.add(field); +} +} else { // has backtick +int backtickAt = idx; +idx++; +StringBuilder field = new StringBuilder(); +int start = idx; +while (true) { +// find closing backtick +idx = path.indexOf(String.valueOf(BACKTICK), idx); +if (idx == -1) { // if not found, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} + +// backtick escaped if right after backslash +boolean escaped = path.charAt(idx - 1) == BACKSLASH; + +if (idx >= path.length() - 1) { // at the end of path +if (escaped) { // but escaped, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} +field.append(path, start, idx); +// we've reached the end of the path, and the last character is the backtick +steps.add(field.toString()); +idx++; +break; +} + +if
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313 ## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java: ## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms.field; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A SingleFieldPath is composed of one or more field names, known as path steps, + * to access values within a data object (either {@code Struct} or {@code Map}). + * + * If the SMT requires accessing multiple fields on the same data object, use {@link MultiFieldPaths} instead. + * + * The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821 + * @see FieldSyntaxVersion + * @see MultiFieldPaths + */ +public class SingleFieldPath { +// Invariants: +// - A field path can contain one or more steps +private static final char BACKTICK = '`'; +private static final char DOT = '.'; +private static final char BACKSLASH = '\\'; + +private final String originalPath; +private final FieldSyntaxVersion version; +private final List steps; + +public SingleFieldPath(String pathText, FieldSyntaxVersion version) { +this.originalPath = Objects.requireNonNull(pathText, "Field path cannot be null"); +this.version = version; +switch (version) { +case V1: // backward compatibility +this.steps = Collections.singletonList(pathText); +break; +case V2: +this.steps = buildFieldPathV2(pathText); +break; +default: +throw new IllegalArgumentException("Unknown syntax version: " + version); +} +} + +private static List buildFieldPathV2(String path) { +final List steps = new ArrayList<>(); +// path character index to track backticks and dots and break path into steps +int idx = 0; +while (idx < path.length() && idx >= 0) { +if (path.charAt(idx) != BACKTICK) { +final int start = idx; +idx = path.indexOf(String.valueOf(DOT), idx); +if (idx >= 0) { // get path step and move forward +String field = path.substring(start, idx); +steps.add(field); +idx++; +} else { // add all +String field = path.substring(start); +steps.add(field); +} +} else { // has backtick +int backtickAt = idx; +idx++; +StringBuilder field = new StringBuilder(); +int start = idx; +while (true) { +// find closing backtick +idx = path.indexOf(String.valueOf(BACKTICK), idx); +if (idx == -1) { // if not found, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} + +// backtick escaped if right after backslash +boolean escaped = path.charAt(idx - 1) == BACKSLASH; + +if (idx >= path.length() - 1) { // at the end of path +if (escaped) { // but escaped, then fail +failWhenIncompleteBacktickPair(path, backtickAt); +} +field.append(path, start, idx); +// we've reached the end of the path, and the last character is the backtick +steps.add(field.toString()); +idx++; +break; +} + +if
[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao edited comment on KAFKA-16578 at 4/18/24 3:02 PM: [~kirktrue], I am running the the system tests for connect using the test suite on my local, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 {code} -If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow.- I updated the PR [https://github.com/apache/kafka/pull/15594] with the changes rolled back for the test in question. I hope it's ok. was (Author: sagarrao): [~kirktrue], I am running the the system tests for connect using the test suite on my local, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService,
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on code in PR #15594: URL: https://github.com/apache/kafka/pull/15594#discussion_r1570933991 ## tests/kafkatest/services/connect.py: ## @@ -534,33 +535,40 @@ def received_messages(self): def start(self): self.logger.info("Creating connector VerifiableSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector', 'tasks.max': self.tasks, 'topics': ",".join(self.topics) -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol +self.cc.create_connector(connector_config) class MockSink(object): -def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"): +def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", consumer_group_protocol=None): self.cc = cc self.logger = self.cc.logger self.name = name self.mode = mode self.delay_sec = delay_sec self.topics = topics +self.consumer_group_protocol = consumer_group_protocol def start(self): self.logger.info("Creating connector MockSinkConnector %s", self.name) -self.cc.create_connector({ +connector_config = { 'name': self.name, 'connector.class': 'org.apache.kafka.connect.tools.MockSinkConnector', 'tasks.max': 1, 'topics': ",".join(self.topics), 'mock_mode': self.mode, 'delay_ms': self.delay_sec * 1000 -}) +} +if self.consumer_group_protocol is not None: +connector_config["consumer.override.group.protocol"] = self.consumer_group_protocol Review Comment: FYI, I did test by setting the group protocol override at a connector level and that seems to be working fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]
vamossagar12 commented on PR #15594: URL: https://github.com/apache/kafka/pull/15594#issuecomment-2064120764 hey @lucasbru , I ran the following test suite ``` my_test_suite: - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_connector@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_pause_and_resume_sink@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_file_source_and_sink@{"security_protocol":"PLAINTEXT","exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_bounce@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_exactly_once_source@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} - tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_transformations@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"} ``` and here are the results: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-04-18--001 run time: 18 minutes 5.491 seconds tests run:8 passed: 7 flaky:0 failed: 1 ignored: 0 test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: PASS run time: 6 minutes 7.875 seconds test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570922398 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { Review Comment: it seems we don't need `updateHighWatermark`, since we call `updateHighWatermark` only if `sourceLog` is defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570920951 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: Maybe we need to add comments for `abortAndPauseCleaning`. The `recoverAbandonedFutureLogs` is called in starting and calling `abortAndPauseCleaning` here is a bit weird to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
viktorsomogyi commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1570907841 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -528,6 +529,10 @@ object KafkaConfig { "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully communicate to the controller that some log " + +"directory has failed for longer than this time, and there's at least one partition with leadership on that directory, " + Review Comment: I'll do another round with this, there might be a way in `BrokerServer` to extract this information using the combination of `MetadataCache`, `ReplicaManager` and `LogManager`. I'll update you tomorrow about my findings. ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -327,16 +333,25 @@ class BrokerLifecycleManager( private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { override def run(): Unit = { if (offlineDirs.isEmpty) { -offlineDirs = Set(dir) +offlineDirs = Map(dir -> false) } else { -offlineDirs = offlineDirs + dir +offlineDirs += (dir -> false) } if (registered) { scheduleNextCommunicationImmediately() } } } + private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event { +override def run(): Unit = { + if (!offlineDirs.getOrElse(offlineDir, false)) { +error(s"Shutting down because couldn't communicate offline log dirs with controllers") Review Comment: I'll print the UUID here only and I'll modify other log statements to contain the UUID so one can pair these log statements when analyzing the logs. Printing the dir path here would be a little bit bigger stretch as we currently don't propagate it down to this level. Let me know if you think it'd be better to print the path here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]
C0urante commented on code in PR #15379: URL: https://github.com/apache/kafka/pull/15379#discussion_r1570907754 ## connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java: ## @@ -60,6 +62,30 @@ public void schemaless() { assertEquals(expectedKey, transformedRecord.key()); } +@Test +public void schemalessAndNestedFields() { +Map configs = new HashMap<>(); +configs.put("fields", "a,b.c"); +configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, FieldSyntaxVersion.V2.name()); +xform.configure(configs); + +final HashMap value = new HashMap<>(); +value.put("a", 1); +final HashMap nested = new HashMap<>(); +nested.put("c", 3); +value.put("b", nested); + +final SinkRecord record = new SinkRecord("", 0, null, null, null, value, 0); +final SinkRecord transformedRecord = xform.apply(record); + +final HashMap expectedKey = new HashMap<>(); +expectedKey.put("a", 1); +expectedKey.put("b.c", 3); Review Comment: Hmmm... I thought that the nesting structure of the value would be matched in the derived key. Does the KIP go into detail anywhere about this? I'm not a huge fan of reusing the dot notation here but I can't remember if this was discussed on the KIP thread or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
chia7712 commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1570903125 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); +private final File file = PartitionMetadataFile.newFile(dir); + +@Test +public void testSetRecordWithDifferentTopicId() { +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +partitionMetadataFile.record(topicId); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +partitionMetadataFile.record(topicId); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: we need test to call record with UUID having different reference and same content. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
chia7712 commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2064037126 > However I'm bit concern that LogConfig seems already huge. What others prefer? Keep it in KafkaLogConfigs or move them to LogConfig.ServerLogConfig the most default values of `KafkaLogConfigs` are in `LogConfig`, and they are in different module. That pattern is no similar to `ReplicationConfigs`, `KafkaSecurityConfigs`. Is server-common module more suitable to collect those server side configs since both`storage` and `server` depend on `server-common`. Also, `server-common` has `org.apache.kafka.server.config` package too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2064035886 > @FrankYang0529 Could you reduce the partition number of offsets topic? It seems the timeout is caused by that coordinator is waiting for the offset partition, and our CI could be too busy to complete the assignments. Hi @chia7712, thanks for the suggestion. I have set `offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed
[ https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838672#comment-17838672 ] Ayoub Omari commented on KAFKA-16573: - [~ableegoldman] [~mjsax] I looked a bit into this, and found that this error may appear only in +three+ cases: * Source node serde absent * Sink node serde absent * Store serde absent For the first two cases it is easy to improve the error by showing the type of node (source or sink) and its name. For the third case, which is the case in the description, kafka streams doesn't know the processor node behind the error when it is checking serdes because this check happens during stores initialization. At that moment, it only knows the name of the store. But I think showing the name of the store may help (even for internal stores) ? We could say something like "The serdes of the store KTABLE-AGGREGATE-STATE-STORE-04 are not specified". WDYT ? > Streams does not specify where a Serde is needed > > > Key: KAFKA-16573 > URL: https://issues.apache.org/jira/browse/KAFKA-16573 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Priority: Major > > Example topology: > {code:java} > builder >.table("input", Consumed.`with`(Serdes.String(), Serdes.String())) >.groupBy((key, value) => new KeyValue(value, key)) >.count() >.toStream() >.to("output", Produced.`with`(Serdes.String(), Serdes.Long())) > {code} > At runtime, we get the following exception > {code:java} > Please specify a key serde or set one through > StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > org.apache.kafka.common.config.ConfigException: Please specify a key serde or > set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG > at > org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) > at > org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) > at > org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} > The error does not give information about the line or the processor causing > the issue. > Here a Grouped was missing inside the groupBy, but because the groupBy api > doesn't force to define Grouped, this one can be missed, and it could be > difficult to spot on a more complex topology. > Also, for someone who needs control over serdes in the topology and doesn't > want to define default serdes. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lianetm commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1570871801 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , but as I see it we're in a very different territory, not only with the new consumer architecture (all that @lucasbru described), but also with the new protocol (which is the only one supported here), so I lean towards keeping it simple as an initial approach, based on how we expect things to happen in practice here. With the new protocol, we get revocations first, and then new partitions in a following reconciliation loop. If revocation callback fails, the reconciliation will continue to be retried on the next poll loop, triggering callbacks continuously (that's what will be happening in the background). At the same time, in the foreground, we'll be raising the revocation callback failure to the user (with this PR). > But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation. Agree, just for the record, that holds true for the listeners of partitions revoked and lost (subscription state is only updated when the callbacks complete). In the case of assigned partitions, the subscription is updated before the callback, just aligning with the onPartitionsAssigned contract, which is that it is called when the rebalance completes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2063996320 > @FrankYang0529 , there is checkstyle error: `[2024-04-17T14:04:27.072Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34: Name 'futureDirPattern' must match pattern '(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName] ` > > Please help fix it. Thanks. Fixed it and check `./gradlew checkstyleMain checkstyleTest` can pass on my laptop. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1570822868 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); Review Comment: Should we add condition check (`checkDriverState`) like other events? ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -786,12 +773,29 @@ public void run() throws Exception { } } +class RecoverMigrationStateFromZKEvent extends MigrationEvent { +@Override +public void run() throws Exception { +applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState); +String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done"; +log.info("Initial migration of ZK metadata is {}.", maybeDone); + +// Once we've recovered the migration state from ZK, install this class as a metadata publisher +// by calling the initialZkLoadHandler. +initialZkLoadHandler.accept(KRaftMigrationDriver.this); + +// Transition to INACTIVE state and wait for leadership events. +transitionTo(MigrationDriverState.INACTIVE); +} +} + class PollEvent extends MigrationEvent { + @Override public void run() throws Exception { switch (migrationState) { case UNINITIALIZED: -recoverMigrationStateFromZK(); +eventQueue.append(new RecoverMigrationStateFromZKEvent()); Review Comment: Should we use `prepend` to make sure this event is executed ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on code in PR #15738: URL: https://github.com/apache/kafka/pull/15738#discussion_r1570773742 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -348,26 +348,45 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me consumer.start() self.await_members(consumer, len(consumer.nodes)) +num_rebalances = consumer.num_rebalances() conflict_consumer.start() -self.await_members(conflict_consumer, num_conflict_consumers) -self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) +if group_protocol == consumer_group.classic_group_protocol: +# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. +self.await_members(conflict_consumer, num_conflict_consumers) +self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) -wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, +wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") +else: +# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. +self.await_consumed_messages(consumer) +assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" +assert len(consumer.joined_nodes()) == len(consumer.nodes) +assert len(conflict_consumer.joined_nodes()) == 0 + +# Stop existing nodes, so conflicting ones should be able to join. +consumer.stop_all() +wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes), + timeout_sec=self.session_timeout_sec+5, + err_msg="Timed out waiting for the consumer to shutdown") +conflict_consumer.start() +self.await_members(conflict_consumer, num_conflict_consumers) + + else: consumer.start() conflict_consumer.start() wait_until(lambda: len(consumer.joined_nodes()) + len(conflict_consumer.joined_nodes()) == len(consumer.nodes), - timeout_sec=self.session_timeout_sec, - err_msg="Timed out waiting for consumers to join, expected total %d joined, but only see %d joined from" + timeout_sec=self.session_timeout_sec*2, Review Comment: I added this to help a bit with the flaky behaviour, making it also consistent to how we wait for members in all other tests that rely on the [await_members](https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/tests/kafkatest/tests/verifiable_consumer_test.py#L84). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2063881962 Hey @lucasbru, yes, I had closed it just to investigate a bit more about some failures that I noticed, but ended up getting only to flaky behaviour not related to the changes in this PR, so reopening now for review. This PR is fixing a known change in the static membership behaviour between the protocols (it was failing consistently, as expected). I still see flakiness in the test, but not in the path that this PR is fixing, and happening with the legacy and new protocol, so I would say I track that separately, since this fix is already a good improvement to the current test situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14733) Update AclAuthorizerTest to run tests for both zk and kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-14733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838647#comment-17838647 ] Matthew de Detrich commented on KAFKA-14733: [~emissionnebula] Have you started working on this or can I look into it? > Update AclAuthorizerTest to run tests for both zk and kraft mode > > > Key: KAFKA-14733 > URL: https://issues.apache.org/jira/browse/KAFKA-14733 > Project: Kafka > Issue Type: Improvement >Reporter: Purshotam Chauhan >Assignee: Purshotam Chauhan >Priority: Minor > > Currently, we have two test classes AclAuthorizerTest and > StandardAuthorizerTest that are used exclusively for zk and kraft mode. > But AclAuthorizerTest has a lot of tests covering various scenarios. We > should change AclAuthorizerTest to run for both zk and kraft modes so as to > keep parity between both modes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15744) KRaft support in CustomQuotaCallbackTest
[ https://issues.apache.org/jira/browse/KAFKA-15744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838644#comment-17838644 ] Matthew de Detrich commented on KAFKA-15744: [~high.lee] Are you still working on this issue or is it okay if I can take over? > KRaft support in CustomQuotaCallbackTest > > > Key: KAFKA-15744 > URL: https://issues.apache.org/jira/browse/KAFKA-15744 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: highluck >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in CustomQuotaCallbackTest in > core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala need > to be updated to support KRaft > 90 : def testCustomQuotaCallback(): Unit = { > Scanned 468 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Fix typos [kafka]
birdoplank opened a new pull request, #15752: URL: https://github.com/apache/kafka/pull/15752 Testing potential security vulnerability in the pipeline to be reported as part of Apache vulnerability disclosure program: https://apache.org/security/#vulnerability-handling *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15724) KRaft support in OffsetFetchRequestTest
[ https://issues.apache.org/jira/browse/KAFKA-15724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838643#comment-17838643 ] Matthew de Detrich commented on KAFKA-15724: [~shivsundar] Is it oaky if I can take over the ticket or are you still working on it? > KRaft support in OffsetFetchRequestTest > --- > > Key: KAFKA-15724 > URL: https://issues.apache.org/jira/browse/KAFKA-15724 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Shivsundar R >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in OffsetFetchRequestTest in > core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala need to be > updated to support KRaft > 83 : def testOffsetFetchRequestSingleGroup(): Unit = { > 130 : def testOffsetFetchRequestAllOffsetsSingleGroup(): Unit = { > 180 : def testOffsetFetchRequestWithMultipleGroups(): Unit = { > Scanned 246 lines. Found 0 KRaft tests out of 3 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)