Re: [PR] [KAFKA-15749] Adding support for Kraft in test testClusterIdPresent [kafka]

2024-04-18 Thread via GitHub


github-actions[bot] commented on PR #15181:
URL: https://github.com/apache/kafka/pull/15181#issuecomment-2065686746

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15712: Added KRaft support to MultipleListenersWithSameSecurityProtocolBaseTest [kafka]

2024-04-18 Thread via GitHub


github-actions[bot] commented on PR #15223:
URL: https://github.com/apache/kafka/pull/15223#issuecomment-2065686726

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15727: Added KRaft support in AlterUserScramCredentialsRequestNotAuthorizedTest [kafka]

2024-04-18 Thread via GitHub


github-actions[bot] commented on PR #15224:
URL: https://github.com/apache/kafka/pull/15224#issuecomment-2065686700

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-18 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065616140

   > > @FrankYang0529 Could you reduce the partition number of offsets topic? 
It seems the timeout is caused by that coordinator is waiting for the offset 
partition, and our CI could be too busy to complete the assignments.
   > 
   > Hi @chia7712, thanks for the suggestion. I have set 
`offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works 
fine.
   
   Hi @chia7712, setting `offsets.topic.num.partitions` as `1` works! Do you 
think that we should revert `unstable.api.versions.enable` change and try 
again? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: disable internal result emit throttling in TTD [kafka]

2024-04-18 Thread via GitHub


mjsax merged PR #15660:
URL: https://github.com/apache/kafka/pull/15660


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838829#comment-17838829
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Thanks for the input. I was not reviewing/voting the original KIP nor the PR. 
Thus, I did just assume there was some mentioning about static groups... As 
there is nothing about it in the KIP as you pointed out, I did some digging and 
the PR reveals why it's only implemented for static members: 
[https://github.com/apache/kafka/pull/12035#discussion_r858263213]

We use admin client "removeMembersFromConsumerGroup" which only works for 
static member, as it take the consumers `group.instance.id` as input. It seems 
it was a pragmatic approach... Re-reading the KIP discussion it seems that 
making it work for regular members would require a change in the consumer API, 
and thus would have been a larger scope KIP (and the idea was to keep the KIP 
scope limited).

Thus, while we might not need a KIP for Kafka Streams, we would need one for 
the consumer to allow KS to use this newly added API... In the mean time, we 
could still to a small PR to update the JavaDocs to call out the current 
limitation (what will not make it a public contract IMHO, so after we get a 
consumer KIP we can still address this limitation w/o another KIP).

Thoughts?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16573:

Priority: Minor  (was: Major)

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Minor
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838827#comment-17838827
 ] 

Matthias J. Sax commented on KAFKA-16573:
-

Thanks for filing this ticket. I think your idea is good; it's for sure an 
improvement over the current state.

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16280:

Issue Type: Improvement  (was: Bug)

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16280: Expose method to determine metric measurability (KIP-1019) [kafka]

2024-04-18 Thread via GitHub


mjsax merged PR #15681:
URL: https://github.com/apache/kafka/pull/15681


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-18 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571520271


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java:
##
@@ -198,6 +204,62 @@ private static void assertApiMessageAndVersionEquals(
 }
 }
 }
+} else if (actual.message() instanceof GroupMetadataValue) {

Review Comment:
   Yeah it's more convenient. Let me change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-18 Thread via GitHub


phooq commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2065444919

   Hey @lianetm , @kirktrue Thanks for the feedback and suggestions!
   
   I like @kirktrue 's advice on having a `toString()` override at the base 
class while each subclass customized the `toStringBase()` , so I made the 
change accordingly. This seems much more cleaner and less confusing.
   
   Please take a look and let me know your thoughts.
   
   Thanks
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-18 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571469219


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+ *
+ * @param consumerGroup The ConsumerGroup.
+ * @param memberId  The fenced member id.
+ * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+ */
+private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {

Review Comment:
   For explicitly leaving group we know the remaining one is using the consumer 
group protocol. I'm not sure about the timeout cases. It could be an expired 
classic protocol member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-18 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2065435357

   I may need to figure out how to deal with this...when setting the version to 
0.
   
   ```
   Replayed a FeatureLevelRecord removing feature test.feature.version
   ```
   ```
   Broker 1 registered with feature test.feature.version that is unknown to the 
controller (org.apache.kafka.controller.ClusterControlManager)
   [2024-04-18 15:32:25,440] INFO [QuorumController id=1] Replayed 
RegisterBrokerRecord modifying the registration for broker 1: 
RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, 
incarnationId=pihYtx_qSSKlJ2K31eGFOw, brokerEpoch=8, 
endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, 
securityProtocol=0)], features=[BrokerFeature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19), 
BrokerFeature(name='test.feature.version', minSupportedVersion=0, 
maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, 
logDirs=[T1fHZ5DjOC1Dk5CviOHPbg]) 
(org.apache.kafka.controller.ClusterControlManager)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571435096


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+ 

Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-18 Thread via GitHub


kirktrue commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2065387420

   We've been encouraged to follow the pattern of having the super class 
implement `toString()` and add a `toStringBase()`, like this:
   
   ```java
   public class Parent {
   
   private final String foo = "Hello, world!";
   
   protected String toStringBase() {
   return "foo=" + foo;
   }
   
   @Override
   public String toString() {
   return getClass().getSimpleName() + "{" + toStringBase() + "}";
   }
   
   }
   ```
   
   Then the subclasses would implement `toStringBase()` to add their values:
   
   ```java
   public class Child extends Parent {
   
   private final String bar = "Many";
   private final String baz = "Few";
   
   @Override
   protected String toStringBase() {
   return super.toStringBase() + ", bar=" + bar + ", baz=" + bad;
   }
   
   }
   ```
   
   The rationale I was given was:
   
   1. It allows the parent to keep its state private, but still "expose" it via 
`toString()`
   2. It helps to ensure the correct class name appears in the output
   3. It keeps the output uniform and reminds the developer to keep the parent 
state
   
   I'm not crazy about the design idiom, but that's what we were asked to use. 
It works OK as long as it's applied uniformly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] cherrypick KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-04-18 Thread via GitHub


jolshan opened a new pull request, #15755:
URL: https://github.com/apache/kafka/pull/15755

   KIP-890 Part 1 introduced verification of transactions with the transaction 
coordinator on the `Produce` and `TxnOffsetCommit` paths. This introduced the 
possibility of new errors when responding to those requests. For backwards 
compatibility with older clients, a choice was made to convert some of the new 
retriable errors to existing errors that are expected and retried correctly by 
older clients.
   
   `NETWORK_EXCEPTION` was forgotten about and not converted, but can occur if, 
for example, the transaction coordinator is temporarily refusing connections. 
Now, we convert it to:
* `NOT_ENOUGH_REPLICAS` on the `Produce` path, just like the other 
retriable errors that can arise from transaction verification.
* `COORDINATOR_LOAD_IN_PROGRESS` on the `TxnOffsetCommit` path. This error 
does not force coordinator lookup on clients, unlike 
`COORDINATOR_NOT_AVAILABLE`. Note that this deviates from KIP-890, which says 
that retriable errors should be converted to `COORDINATOR_NOT_AVAILABLE`.
   
Conflicts:
core/src/main/scala/kafka/server/ReplicaManager.scala

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
   
   There were some conflicts in the how the code path changed. We have three 
paths. 
   1. Produce -- In appendEntries we have the callback just for produce 
requests. I've included the error and the comment there.  
   2. Old Group Coordinator -- In GroupMetadataManager, we handle the 
conversion in `maybeConvertOffsetCommitError` This path is separate from the 
produce path. 
   3. New Group Coordinator -- Not supported in 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-18 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838796#comment-17838796
 ] 

Justine Olshan commented on KAFKA-16570:


Hmmm – retries here are a bit strange since the command was successful, we will 
just return this error until we can allocate the new producer ID. I suppose 
this "guarantees" the markers were written, but it is not consistent with how 
end txn usually works.

See how the error is return on success of the EndTxn api call? 
[https://github.com/apache/kafka/blob/53ff1a5a589eb0e30a302724fcf5d0c72c823682/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L175]
 

> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-18 Thread via GitHub


philipnee commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1571359996


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) {
  * is a request in-flight.
  */
 public boolean requestInFlight() {
-return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
+return requestInFlight;

Review Comment:
   sorry - I meant to say `hasInflightRequest`.  Either way is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571348485


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+private final ConsumerPartitionAssignor assignor;
+
+AssignorType(ConsumerPartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public ConsumerPartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "COOPERATIVE_STICKY"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private Map subscriptions 
= new HashMap<>();
+
+private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private ConsumerPartitionAssignor assignor;
+
+private Cluster metadata;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// Ensure there are enough racks and brokers for the replication 
factor.
+if (NUMBER_OF_RACKS < 2) {
+throw new IllegalArgumentException("Number of broker racks must be 
at least equal to 2.");
+}
+
+populateTopicMetadata();
+
+addMemberSubscriptions();
+
+assignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) simulateRebalance();
+}
+
+private void populateTopicMetadata() {
+List partitions = new ArrayList<>();
+int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) 
/ topicCount;
+
+// Create nodes 

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571339288


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-18 Thread via GitHub


lucasbru commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1571335784


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) {
  * is a request in-flight.
  */
 public boolean requestInFlight() {
-return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
+return requestInFlight;

Review Comment:
   `inflightRequested` seems to change the meaning. Who requested an inflight?
   
   In my head it's clearly the noun "request", but if you are bothered, may I 
suggest "isRequestInFlight"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571327160


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;

Review Comment:
   Okay, I'll add that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-04-18 Thread Artem Livshits (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838786#comment-17838786
 ] 

Artem Livshits commented on KAFKA-16570:


> We should just swallow this error and treat this as a successful run of the 
> command. 

I think we should instead implement retries and fail if we cannot succeed 
within the command timeout.

> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated

2024-04-18 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16386:
---
Affects Version/s: 3.7.0

> NETWORK_EXCEPTIONs from transaction verification are not translated
> ---
>
> Key: KAFKA-16386
> URL: https://issues.apache.org/jira/browse/KAFKA-16386
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0, 3.7.0
>Reporter: Sean Quah
>Priority: Minor
> Fix For: 3.8.0
>
>
> KAFKA-14402 
> ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
>  adds verification with the transaction coordinator on Produce and 
> TxnOffsetCommit paths as a defense against hanging transactions. For 
> compatibility with older clients, retriable errors from the verification step 
> are translated to ones already expected and handled by existing clients. When 
> verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.
> [~dajac] noticed this manifesting as a test failure when 
> tests/kafkatest/tests/core/transactions_test.py was run with an older client 
> (prior to the fix for KAFKA-16122):
> {quote}
> {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
> {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error 
> so it transitions to the fatal state.
> It seems that there are two cases where the server could return it: (1) When 
> the verification request times out or its connections is cut; or (2) in 
> {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because 
> we want a retriable error.
> {quote}
> The first case was triggered as part of the test. The second case happens 
> when there is already a verification request ({{AddPartitionsToTxn}}) in 
> flight with the same epoch and we want clients to try again when we're not 
> busy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16386) NETWORK_EXCEPTIONs from transaction verification are not translated

2024-04-18 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838768#comment-17838768
 ] 

Justine Olshan commented on KAFKA-16386:


Note: For 3.6, this is only returned in the produce path which treats the 
exception as retriable. The TxnOffsetCommit path is the one that experiences 
the fatal error and is only in 3.7.

> NETWORK_EXCEPTIONs from transaction verification are not translated
> ---
>
> Key: KAFKA-16386
> URL: https://issues.apache.org/jira/browse/KAFKA-16386
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0, 3.7.0
>Reporter: Sean Quah
>Priority: Minor
> Fix For: 3.8.0
>
>
> KAFKA-14402 
> ([KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense])
>  adds verification with the transaction coordinator on Produce and 
> TxnOffsetCommit paths as a defense against hanging transactions. For 
> compatibility with older clients, retriable errors from the verification step 
> are translated to ones already expected and handled by existing clients. When 
> verification was added, we forgot to translate {{NETWORK_EXCEPTION}} s.
> [~dajac] noticed this manifesting as a test failure when 
> tests/kafkatest/tests/core/transactions_test.py was run with an older client 
> (prior to the fix for KAFKA-16122):
> {quote}
> {{NETWORK_EXCEPTION}} is indeed returned as a partition error. The 
> {{TransactionManager.TxnOffsetCommitHandler}} considers it as a fatal error 
> so it transitions to the fatal state.
> It seems that there are two cases where the server could return it: (1) When 
> the verification request times out or its connections is cut; or (2) in 
> {{AddPartitionsToTxnManager.addTxnData}} where we say that we use it because 
> we want a retriable error.
> {quote}
> The first case was triggered as part of the test. The second case happens 
> when there is already a verification request ({{AddPartitionsToTxn}}) in 
> flight with the same epoch and we want clients to try again when we're not 
> busy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   This sleep value, though small, was the one that eventually worked after 
running the test continuously for 30 times. Sleeping for 10s still kept failing 
because sometimes the coordinator's session timeout would happen even though it 
doesn't look like it should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-18 Thread via GitHub


emitskevich-blp commented on PR #15722:
URL: https://github.com/apache/kafka/pull/15722#issuecomment-2064749720

   Is it possible to add unit test? 
   > I'm not sure whether it's needed here. Effectively, such test would verify 
the behavior of deprecated method. What do you think?
   
   We should verify that docs should not include deprecated
   > The docs are correct: 
[io-wait-ratio](https://github.com/emitskevich-blp/kafka/blob/53ff1a5a589eb0e30a302724fcf5d0c72c823682/docs/ops.html#L2372),
 
[io-ratio](https://github.com/emitskevich-blp/kafka/blob/53ff1a5a589eb0e30a302724fcf5d0c72c823682/docs/ops.html#L2392)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571147929


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+

Review Comment:
   For the purpose of this test, I create a new coordinator with rebalance 
timeout lesser than session timeout . As mentioned in the comment, it mayn't 
happen in the real world, but it makes testing a lot easier. Let me know what 
you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15089) Consolidate all the group coordinator configs

2024-04-18 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838754#comment-17838754
 ] 

David Jacot commented on KAFKA-15089:
-

The goal was to also define the “AbstractConfig” part here like we did for the 
storage and raft modules. Are you interested in giving it a try?

> Consolidate all the group coordinator configs
> -
>
> Key: KAFKA-15089
> URL: https://issues.apache.org/jira/browse/KAFKA-15089
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> The group coordinator configurations are defined in KafkaConfig at the 
> moment. As KafkaConfig is defined in the core module, we can't pass it to the 
> new java modules to pass the configurations along.
> A suggestion here is to centralize all the configurations of a module in the 
> module itself similarly to what we have do for RemoteLogManagerConfig and 
> RaftConfig. We also need a mechanism to add all the properties defined in the 
> module to the KafkaConfig's ConfigDef.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> null);
+
+// The heartbeat thread is running and keeps sending heartbeat 
requests.
+TestUtils.waitForCondition(() -> {
+// Rebalance timeout elapses while poll is never invoked 
causing a poll timeout expiry
+coordinator.sendHeartbeatRequest();
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+time.sleep(1);

Review Comment:
   This sleep value, though small was the one that eventually worked after 
running the test continuously for 30 times. Sleeping for 10s still kept failing 
because sometimes the coordinator's session timeout would happen even though it 
doesn't look like it should.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571147929


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+// We will create a new WorkerCoordinator object with a rebalance 
timeout smaller
+// than session timeout. This might not happen in the real world but 
it makes testing
+// easier and the test not flaky.
+int smallRebalanceTimeout = 20;
+this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+smallRebalanceTimeout,
+heartbeatIntervalMs,
+groupId,
+Optional.empty(),
+retryBackoffMs,
+retryBackoffMaxMs,
+true);
+this.coordinator = new WorkerCoordinator(rebalanceConfig,
+logContext,
+consumerClient,
+new Metrics(time),
+"consumer" + groupId,
+time,
+LEADER_URL,
+configStorage,
+rebalanceListener,
+compatibility,
+0);
+

Review Comment:
   For the purpose of this test, I create a new coordinator with rebalance 
timeout lesser than session timeout . As mentioned in the comment, it mayn't 
happen in the real world, but it makes testing a lot easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-18 Thread via GitHub


emitskevich-blp commented on code in PR #15722:
URL: https://github.com/apache/kafka/pull/15722#discussion_r1571063662


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
 String baseName, String action) {
 MetricName rateMetricName = metrics.metricName(baseName + 
"-ratio", groupName,
-String.format("*Deprecated* The fraction of time the I/O 
thread spent %s", action), metricTags);
+String.format("The fraction of time the I/O thread spent 
%s", action), metricTags);

Review Comment:
   Added to method javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-18 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16389.

Resolution: Fixed

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch, consumer.log
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571146638


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   yeah I got that. I added heartbeat request response till the rebalance 
timeout happens. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1571142879


##
server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {

Review Comment:
   > I think at some point we need to revisit some of the existing Config files 
as some of them breaks few rules like using PROP instead of CONFIG suffix and 
use DEFAULT as prefix instead of suffix and some of the classes is Config 
instead of Configs as well.
   
   agree!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]

2024-04-18 Thread via GitHub


jolshan commented on PR #15559:
URL: https://github.com/apache/kafka/pull/15559#issuecomment-2064671000

   I wonder if I should backport this to 3.7 as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1571126880


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Thanks for suggesting the alternative approach. I'll check and comeback on 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


kamalcph commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2064647379

   Thanks @chia7712 for the review!
   
   > log-start-offset-checkpoint is missing and remote storage is enabled. The 
logStartOffset will be set to zero, and it seems be a potential issue since the 
ListOffsetRequest could get incorrect result
   
   Most of the time when the follower joins the ISR, it updates the 
log-start-offset and high-watermark from the leader FETCH response. The issue 
can happen only when the follower gets elected as leader before updating it's 
state as mentioned in the summary/comments. 
   
   When the `log-start-offset-checkpoint` file is missing:
   1.  For normal topic, the log-start-offset will be set to base-offset of the 
first log segment so there is no issue. Since the data is there, read won't 
fail.
   2. For remote topic, the log-start-offset will be stale for sometime until 
the RemoteLogManager 
[updates](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L671)
 it, so the issue is intermittent and self-recovers.
   
   > replication-offset-checkpoint is missing and remote storage is enabled. 
This is what your described. The HWM is pointed to middle of tiered storage and 
so it causes error when fetching records from local segments.
   
   This is not an issue for normal topic. But for cluster enabled with 
remote-storage, if the issue happens even on 1 partition, then it starts to 
affect *subset* of topics.  Controller batches the partitions in the 
LeaderAndIsr request. If the broker fails to process the LISR for one 
partition, then the remaining partition in that batch won't be processed. The 
producers producing to those topics will start receiving 
NOT_LEADER_FOR_PARTITION error.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets [kafka]

2024-04-18 Thread via GitHub


jolshan closed pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets 
does not return actual first offsets
URL: https://github.com/apache/kafka/pull/9590


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1571119979


##
server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {

Review Comment:
   Just pushed an update. The problem right now that we have the name of the 
config type without any unnecessary prefixes has been preserved in other 
modules and am trying to avoid making this pr bigger than it should be. I think 
at some point we need to revisit some of the existing Config files as some of 
them breaks few rules like using `PROP` instead of `CONFIG` suffix and use 
`DEFAULT` as prefix instead of suffix and some of the classes is `Config` 
instead of `Configs` as well.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1571099798


##
server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {

Review Comment:
   I can rename it to `ServerLogConfigs` or `BrokerLogConfigs` but we already 
have `LogConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1571093805


##
server-common/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {

Review Comment:
   Do we need to add prefix "kafka" here? sorry that I try to figure out the 
naming rule. It seems to me the prefix "kafka" means the class is used to 
collect other "xxxConfigs" constants and to be the central place for kafka 
xxx-related configs. For example, `KafkaSecurityConfigs` collects all 
security-related configs from `BrokerSecurityConfigs`, `SaslConfigs`, 
`SslConfigs`, etc 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-18 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1571079283


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+ *
+ * @param consumerGroup The ConsumerGroup.
+ * @param memberId  The fenced member id.
+ * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+ */
+private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+
+ClassicGroup classicGroup;
+try {
+classicGroup = ClassicGroup.fromConsumerGroup(
+consumerGroup,
+leavingMemberId,
+logContext,
+time,
+metrics,
+consumerGroupSessionTimeoutMs,
+metadataImage
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
classicGroup.createConsumerGroupRecords(metadataImage.features().metadataVersion(),
 records);
+
+removeGroup(consumerGroup.groupId());
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group %s.", 
classicGroup.groupId()));
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {

Review Comment:
   I wonder if we could use `exceptionally` here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -777,11 +778,78 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
+ *
+ * @param consumerGroup The ConsumerGroup.
+ * @param memberId  The fenced member id.
+ * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+ */
+private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its 

[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Labels: kip-848-client-support system-tests  (was: kip-848-client-support)

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol

2024-04-18 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16566:
---
Component/s: clients
 consumer
 system tests

> Update consumer static membership fencing system test to support new protocol
> -
>
> Key: KAFKA-16566
> URL: https://issues.apache.org/jira/browse/KAFKA-16566
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer 
> that verifies the sequence in which static members join a group when using 
> conflicting instance id. This behaviour is different in the classic and 
> consumer protocol, so the tests should be updated to set the right 
> expectations when running with the new consumer protocol. Note that what the 
> tests covers (params, setup), apply to both protocols. It is the expected 
> results that are not the same. 
> When conflicts between static members joining a group:
> Classic protocol: all members join the group with the same group instance id, 
> and then the first one will eventually receive a HB error with 
> FencedInstanceIdException
> Consumer protocol: new member with an instance Id already in use is not able 
> to join, receiving an UnreleasedInstanceIdException in the response to the HB 
> to join the group.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838730#comment-17838730
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

Thanks for raising this ticket. Wondering how we could address it though... 
Given that the "contract" is that the record key is not modified, but there is 
no input record, how could the key be set in a meaningful way? – The only thing 
I can think of right now would be to set `key = null`, but it's still 
semantically questionable...

Can you provide more details what you are trying to do?

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838728#comment-17838728
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

Why is this ticket marked as "blocker" for 4.0 release?

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Component/s: streams

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Labels: kip  (was: )

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838727#comment-17838727
 ] 

Matthias J. Sax commented on KAFKA-16263:
-

Yes, these are the handlers this ticket refers do.

> Add Kafka Streams docs about available listeners/callback
> -
>
> Key: KAFKA-16263
> URL: https://issues.apache.org/jira/browse/KAFKA-16263
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: beginner, newbie
>
> Kafka Streams allows to register all kind of listeners and callback (eg, 
> uncaught-exception-handler, restore-listeners, etc) but those are not in the 
> documentation.
> A good place might be 
> [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]

2024-04-18 Thread via GitHub


linu-shibu commented on PR #15620:
URL: https://github.com/apache/kafka/pull/15620#issuecomment-2064470343

   > Hi @linu-shibu @showuon this still uses raw types, and so is still 
type-unsafe. Fixing that was my motivation for creating the ticket, sorry for 
not emphasizing it more.
   > 
   > I think it should be addressed in this PR rather than a follow-up, because 
it involves changes to the exact same code.
   > 
   > One other thought that occurs to me now is that we should store the 
metadata transform instances, so that we don't incur the cost of constructing 
them for each call.
   > 
   > Thanks!
   
   Sure @gharris1727, I will address the above mentioned comments and update 
the PR. Thanks for pointing it out!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-18 Thread via GitHub


emitskevich-blp commented on code in PR #15722:
URL: https://github.com/apache/kafka/pull/15722#discussion_r1571063662


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
 String baseName, String action) {
 MetricName rateMetricName = metrics.metricName(baseName + 
"-ratio", groupName,
-String.format("*Deprecated* The fraction of time the I/O 
thread spent %s", action), metricTags);
+String.format("The fraction of time the I/O thread spent 
%s", action), metricTags);

Review Comment:
   Added in method javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838724#comment-17838724
 ] 

Matthias J. Sax commented on KAFKA-16336:
-

The next planned release is 3.8, but we can work on this ticket only for 4.0, 
as it's a breaking change that's only allowed for a major release. – So this 
ticket cannot be picked up yet.

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


OmniaGM commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2064450248

   > > However I'm bit concern that LogConfig seems already huge. What others 
prefer? Keep it in KafkaLogConfigs or move them to LogConfig.ServerLogConfig
   > 
   > the most default values of `KafkaLogConfigs` are in `LogConfig`, and they 
are in different module. That pattern is no similar to `ReplicationConfigs`, 
`KafkaSecurityConfigs`. Is server-common module more suitable to collect those 
server side configs since both`storage` and `server` depend on `server-common`. 
Also, `server-common` has `org.apache.kafka.server.config` package too.
   
   I think it might be better and simpler to move `KafkaLogConfigs` into 
`server-common` and make LogConfig use them. I'll push an update with this soon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-04-18 Thread via GitHub


mumrah merged PR #15470:
URL: https://github.com/apache/kafka/pull/15470


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1571030504


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   Added a comment in 
[3d78e55](https://github.com/apache/kafka/pull/15136/commits/3d78e5567fd16aa77f0b9f8c510ad26f0e6443c6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix typos [kafka]

2024-04-18 Thread via GitHub


birdoplank closed pull request #15752: Fix typos
URL: https://github.com/apache/kafka/pull/15752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1571018390


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+class SingleFieldPathTest {
+
+@Test
+void shouldIncludeEmptyFieldNames() {
+assertArrayEquals(
+new String[] {"", "", ""},
+new SingleFieldPath("..", FieldSyntaxVersion.V2).path()
+);
+assertArrayEquals(
+new String[] {"foo", "", ""},
+new SingleFieldPath("foo..", FieldSyntaxVersion.V2).path()
+);
+assertArrayEquals(
+new String[] {"", "bar", ""},
+new SingleFieldPath(".bar.", FieldSyntaxVersion.V2).path()
+);
+assertArrayEquals(
+new String[] {"", "", "baz"},
+new SingleFieldPath("..baz", FieldSyntaxVersion.V2).path()
+);
+}

Review Comment:
   This case can probably be moved to `FieldPathNotationTest`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1571012712


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)

Review Comment:
   sry I missed that, I changed it in the next commit
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1571012160


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+  info(s"The current replica is successfully replaced with the future 
replica for $topicPartition")
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {

Review Comment:
   > I think that's not necessarily true. findAbandonedFutureLogs may find a 
sourceLog if the log directory is online but I don't think it's safe to update 
the high watermark even then because sourceLog may be ahead of futureLog.
   
   you are right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1571007787


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class FieldPathNotationTest {
+final static String[] EMPTY_PATH = new String[] {};
+
+@Test
+void shouldBuildV1WithDotsAndBacktickPair() {
+// Given v1
+// When path contains dots, then single step path
+assertArrayEquals(
+new String[] {"foo.bar.baz"},
+new SingleFieldPath("foo.bar.baz", 
FieldSyntaxVersion.V1).path());
+// When path contains backticks, then single step path
+assertArrayEquals(
+new String[] {"foo`bar`"},
+new SingleFieldPath("foo`bar`", FieldSyntaxVersion.V1).path());
+// When path contains dots and backticks, then single step path
+assertArrayEquals(
+new String[] {"foo.`bar.baz`"},
+new SingleFieldPath("foo.`bar.baz`", 
FieldSyntaxVersion.V1).path());
+}
+
+@Test
+void shouldBuildV2WithEmptyPath() {
+// Given v2
+// When path is empty
+// Then build a path with no steps
+assertArrayEquals(EMPTY_PATH, new SingleFieldPath("", 
FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WithoutDots() {
+// Given v2
+// When path without dots
+// Then build a single step path
+assertArrayEquals(new String[] {"foobarbaz"}, new 
SingleFieldPath("foobarbaz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WhenIncludesDots() {
+// Given v2 and fields without dots
+// When path includes dots
+// Then build a path with steps separated by dots
+assertArrayEquals(new String[] {"foo", "bar", "baz"}, new 
SingleFieldPath("foo.bar.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WithoutWrappingBackticks() {
+// Given v2 and fields without dots
+// When backticks are not wrapping a field name
+// Then build a single step path including backticks
+assertArrayEquals(new String[] {"foo`bar`baz"}, new 
SingleFieldPath("foo`bar`baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
+// Given v2 and fields including dots
+// When backticks are wrapping a field name (i.e. withing edges or 
between dots)
+// Then build a path with steps separated by dots and not including 
backticks
+assertArrayEquals(new String[] {"foo.bar.baz"}, new 
SingleFieldPath("`foo.bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar.baz"}, new 
SingleFieldPath("foo.`bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo.bar", "baz"}, new 
SingleFieldPath("`foo.bar`.baz", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar", "baz"}, new 
SingleFieldPath("foo.`bar`.baz", FieldSyntaxVersion.V2).path());
+}
+
+@Test
+void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
+// Given v2 and fields including dots and backticks
+// When backticks are wrapping a field name (i.e. withing edges or 
between dots)
+// Then build a path with steps separated by dots and including 
non-wrapping backticks
+assertArrayEquals(new String[] {"foo", "`bar.baz"}, new 
SingleFieldPath("foo.``bar.baz`", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "bar.baz`"}, new 
SingleFieldPath("foo.`bar.baz``", FieldSyntaxVersion.V2).path());
+assertArrayEquals(new String[] {"foo", "ba`r.baz"}, 

Re: [PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-18 Thread via GitHub


philipnee commented on PR #15753:
URL: https://github.com/apache/kafka/pull/15753#issuecomment-2064308513

   @lucasbru - This is just to remove the consumer protocol from testing as it 
is not suited for this test. Much appreciated if you get a chance to look at 
this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16579: Revert Consumer Rolling Upgrade [kafka]

2024-04-18 Thread via GitHub


philipnee opened a new pull request, #15753:
URL: https://github.com/apache/kafka/pull/15753

   Consumer Rolling Upgrade is meant to test the protocol upgrade for the old 
protocol.  Therefore, I am removing old changes.
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-04-18--004
   run time: 1 minute 38.006 seconds
   tests run:3
   passed:   3
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False
   status: PASS
   run time:   33.562 seconds
   

   test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True
   status: PASS
   run time:   34.902 seconds
   

   test_id:
kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ZK.use_new_coordinator=False
   status: PASS
   run time:   29.447 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570998105


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/FieldPathNotationTest.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class FieldPathNotationTest {

Review Comment:
   We can simplify these tests with a few utility methods:
   
   ```java
   private void assertParseV1(String path) {
   assertArrayEquals(
   new String[] {path},
   new SingleFieldPath(path, FieldSyntaxVersion.V1).path());
   }
   
   private void assertParseV2(String inputPath, String... expectedSteps) {
   assertArrayEquals(
   expectedSteps,
   new SingleFieldPath(inputPath, FieldSyntaxVersion.V2).path()
   );
   }
   
   private void assertParseV2Error(String inputPath, String expectedMessage) {
   ConfigException exception = assertThrows(
   ConfigException.class,
   () -> new SingleFieldPath(inputPath, FieldSyntaxVersion.V2)
   );
   assertEquals(expectedMessage, exception.getMessage());
   }
   ```
   
   Some simplified test cases would look like this:
   ```java
   @Test
   void shouldBuildV1WithDotsAndBacktickPair() {
   // Given v1
   // When path contains dots, then single step path
   assertParseV1("foo.bar.baz");
   // When path contains backticks, then single step path
   assertParseV1("foo`bar`");
   // When path contains dots and backticks, then single step path
   assertParseV1("foo.`bar.baz`");
   }
   
   @Test
   void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
   // Given v2 and fields including dots
   // When backticks are wrapping a field name (i.e. withing edges or 
between dots)
   // Then build a path with steps separated by dots and not including 
backticks
   assertParseV2("`foo.bar.baz`", "foo.bar.baz");
   assertParseV2("foo.`bar.baz`", "foo", "bar.baz");
   assertParseV2("`foo.bar`.baz", "foo.bar", "baz");
   assertParseV2("foo.`bar`.baz", "foo", "bar", "baz");
   }
   
   @Test
   void shouldFailV2WhenIncompleteBackticks() {
   // Given v2
   // When backticks are not closed and not escaped
   // Then it should fail
   assertParseV2Error(
   "`foo.bar.baz",
   "Incomplete backtick pair in path: [`foo.bar.baz], consider 
adding a backslash before backtick at position 0 to escape it"
   );
   
   assertParseV2Error(
   "foo.`bar.baz",
   "Incomplete backtick pair in path: [foo.`bar.baz], consider 
adding a backslash before backtick at position 4 to escape it"
   );
   
   assertParseV2Error(
   "foo.bar.`baz",
   "Incomplete backtick pair in path: [foo.bar.`baz], consider 
adding a backslash before backtick at position 8 to escape it"
   );
   
   assertParseV2Error(
   "foo.bar.`baz\\`",
   "Incomplete backtick pair in path: [foo.bar.`baz\\`], consider 
adding a backslash before backtick at position 8 to escape it"
   );
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-18 Thread via GitHub


philipnee commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1570988398


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -98,12 +93,11 @@ public boolean canSendRequest(final long currentTimeMs) {
  * is a request in-flight.
  */
 public boolean requestInFlight() {
-return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
+return requestInFlight;

Review Comment:
   do you think it would make sense to rename this to: `inflightRequested`? 
`requestInflight` sounds like a verb/action to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-18 Thread via GitHub


philipnee commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1570986831


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java:
##
@@ -48,4 +50,40 @@ public void testRequestStateSimple() {
 state.reset();
 assertTrue(state.canSendRequest(200));
 }
+
+@Test
+public void testTrackInflightOnSuccessfulAttempt() {
+testTrackInflight(RequestState::onSuccessfulAttempt);
+}
+
+@Test
+public void testTrackInflightOnFailedAttempt() {
+testTrackInflight(RequestState::onFailedAttempt);
+}
+
+private void testTrackInflight(BiConsumer 
onCompletedAttempt) {
+RequestState state = new RequestState(
+new LogContext(),
+this.getClass().getSimpleName(),
+100,
+2,
+1000,
+0);
+
+// This is just being paranoid...
+assertFalse(state.requestInFlight());
+
+// When we've sent a request, the flag should update from false to 
true.
+state.onSendAttempt();
+assertTrue(state.requestInFlight());
+
+// Now we've received the response.
+onCompletedAttempt.accept(state, 236);
+
+// When we've sent a second request with THE SAME TIMESTAMP as the 
previous response,

Review Comment:
   The requestInFlight doesn't update the timestamp internally.  I wonder if we 
should just say "making consecutive requests" instead of mentioning the 
timestamp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-18 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2064258399

   Hey @cadonna, could you take a look when you have chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557: Fix toString of OffsetFetchRequestState [kafka]

2024-04-18 Thread via GitHub


lianetm commented on PR #15750:
URL: https://github.com/apache/kafka/pull/15750#issuecomment-2064223926

   Hey @phooq, thanks for taking on this one. High level question about the 
motivation. The `toStringBase` defined in the base class `RequestState` 
includes vars that all states have, so that inheritors can include them in 
their toString. This btw, is why it's used in the RequestState toString itself. 
So I would expect that the usage would be that inheritors like this 
`OffsetFetchRequestState` would just want to override their `toString`, include 
the props that are specific to them, and also include the props that are common 
to all states using the toStringBase (exactly what being done before). What do 
you think?  
   
   That being said, this makes me notice that actually only the 
`OffsetFetchRequestState` is overriding the toString, so, if we align on what 
we want to achieve, we could repurpose this PR/Jira and make the toString 
consistent in all request states that inherit from the base RequestState, so 
they all override the toString in a similar way to the fetch request state. 
   
   Thoughts? let me know if I'm missing something about the jira itself 
@kirktrue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-18 Thread via GitHub


viktorsomogyi commented on PR #15697:
URL: https://github.com/apache/kafka/pull/15697#issuecomment-2064216189

   Rebased on latest trunk as there were some conflicts. Addressed some of the 
comments but there are 2 things I need to investigate:
   * `LogDirFailureTest` fails in `@AfterAll` likely because an incorrect 
shutdown, perhaps there's a timing issue
   * Check if we can detect if there are any leaders before shutdown
   I'll update on both shortly, hopefully tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-18 Thread via GitHub


cadonna commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1570932239


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first

Review Comment:
   I do not understand those comments. How do I recognize in the code that the 
coordinator is not looked up first? And why is that so important?



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,64 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeConsumerCloses(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commit are 
completed before the consumer
+// is closed, even when no commit sync is performed as part of the close 
(due to auto-commit
+// disabled, or simply because there no consumed offsets).
+val producer = createProducer()
+sendRecords(producer, numRecords = 3, tp)
+sendRecords(producer, numRecords = 3, tp2)
+
+val consumer = createConsumer()
+consumer.assign(List(tp, tp2).asJava)
+
+// Try without looking up the coordinator first
+val cb = new CountConsumerCommitCallback
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new 
OffsetAndMetadata(1L))).asJava, cb)
+consumer.close()
+assertEquals(2, cb.successCount)
+  }
+
+  // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, 
groupProtocol: String): Unit = {
+// This is testing the contract that asynchronous offset commits sent 
previously with the
+// `commitAsync` are guaranteed to have their callbacks invoked prior to 
completion of
+// `commitSync` (given that it does not time out).
+val 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570962332


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map}).
+ *
+ * If the SMT requires accessing multiple fields on the same data object, 
use {@link MultiFieldPaths} instead.
+ *
+ * The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+// Invariants:
+// - A field path can contain one or more steps
+private static final char BACKTICK = '`';
+private static final char DOT = '.';
+private static final char BACKSLASH = '\\';
+
+private final String originalPath;
+private final FieldSyntaxVersion version;
+private final List steps;
+
+public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+this.originalPath = Objects.requireNonNull(pathText, "Field path 
cannot be null");
+this.version = version;
+switch (version) {
+case V1: // backward compatibility
+this.steps = Collections.singletonList(pathText);
+break;
+case V2:
+this.steps = buildFieldPathV2(pathText);
+break;
+default:
+throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+}
+}
+
+private static List buildFieldPathV2(String path) {
+final List steps = new ArrayList<>();
+// path character index to track backticks and dots and break path 
into steps
+int idx = 0;
+while (idx < path.length() && idx >= 0) {
+if (path.charAt(idx) != BACKTICK) {
+final int start = idx;
+idx = path.indexOf(String.valueOf(DOT), idx);
+if (idx >= 0) { // get path step and move forward
+String field = path.substring(start, idx);
+steps.add(field);
+idx++;
+} else { // add all
+String field = path.substring(start);
+steps.add(field);
+}
+} else { // has backtick
+int backtickAt = idx;
+idx++;
+StringBuilder field = new StringBuilder();
+int start = idx;
+while (true) {
+// find closing backtick
+idx = path.indexOf(String.valueOf(BACKTICK), idx);
+if (idx == -1) { // if not found, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+
+// backtick escaped if right after backslash
+boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+if (idx >= path.length() - 1) { // at the end of path
+if (escaped) { // but escaped, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+field.append(path, start, idx);
+// we've reached the end of the path, and the last 
character is the backtick
+steps.add(field.toString());
+idx++;
+break;
+}
+
+if 

Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570959086


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+  info(s"The current replica is successfully replaced with the future 
replica for $topicPartition")
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {

Review Comment:
   I think that's not necessarily true. `findAbandonedFutureLogs` may find a 
`sourceLog` if the log directory is online but I don't think it's safe to 
update the high watermark even then because sourceLog may be ahead of futureLog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570960525


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map}).
+ *
+ * If the SMT requires accessing multiple fields on the same data object, 
use {@link MultiFieldPaths} instead.
+ *
+ * The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+// Invariants:
+// - A field path can contain one or more steps
+private static final char BACKTICK = '`';
+private static final char DOT = '.';
+private static final char BACKSLASH = '\\';
+
+private final String originalPath;
+private final FieldSyntaxVersion version;
+private final List steps;
+
+public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+this.originalPath = Objects.requireNonNull(pathText, "Field path 
cannot be null");
+this.version = version;
+switch (version) {
+case V1: // backward compatibility
+this.steps = Collections.singletonList(pathText);
+break;
+case V2:
+this.steps = buildFieldPathV2(pathText);
+break;
+default:
+throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+}
+}
+
+private static List buildFieldPathV2(String path) {
+final List steps = new ArrayList<>();
+// path character index to track backticks and dots and break path 
into steps
+int idx = 0;
+while (idx < path.length() && idx >= 0) {
+if (path.charAt(idx) != BACKTICK) {
+final int start = idx;
+idx = path.indexOf(String.valueOf(DOT), idx);
+if (idx >= 0) { // get path step and move forward
+String field = path.substring(start, idx);
+steps.add(field);
+idx++;
+} else { // add all
+String field = path.substring(start);
+steps.add(field);
+}
+} else { // has backtick
+int backtickAt = idx;
+idx++;
+StringBuilder field = new StringBuilder();
+int start = idx;
+while (true) {
+// find closing backtick
+idx = path.indexOf(String.valueOf(BACKTICK), idx);
+if (idx == -1) { // if not found, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+
+// backtick escaped if right after backslash
+boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+if (idx >= path.length() - 1) { // at the end of path
+if (escaped) { // but escaped, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+field.append(path, start, idx);
+// we've reached the end of the path, and the last 
character is the backtick
+steps.add(field.toString());
+idx++;
+break;
+}
+
+if 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map}).
+ *
+ * If the SMT requires accessing multiple fields on the same data object, 
use {@link MultiFieldPaths} instead.
+ *
+ * The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+// Invariants:
+// - A field path can contain one or more steps
+private static final char BACKTICK = '`';
+private static final char DOT = '.';
+private static final char BACKSLASH = '\\';
+
+private final String originalPath;
+private final FieldSyntaxVersion version;
+private final List steps;
+
+public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+this.originalPath = Objects.requireNonNull(pathText, "Field path 
cannot be null");
+this.version = version;
+switch (version) {
+case V1: // backward compatibility
+this.steps = Collections.singletonList(pathText);
+break;
+case V2:
+this.steps = buildFieldPathV2(pathText);
+break;
+default:
+throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+}
+}
+
+private static List buildFieldPathV2(String path) {
+final List steps = new ArrayList<>();
+// path character index to track backticks and dots and break path 
into steps
+int idx = 0;
+while (idx < path.length() && idx >= 0) {
+if (path.charAt(idx) != BACKTICK) {
+final int start = idx;
+idx = path.indexOf(String.valueOf(DOT), idx);
+if (idx >= 0) { // get path step and move forward
+String field = path.substring(start, idx);
+steps.add(field);
+idx++;
+} else { // add all
+String field = path.substring(start);
+steps.add(field);
+}
+} else { // has backtick
+int backtickAt = idx;
+idx++;
+StringBuilder field = new StringBuilder();
+int start = idx;
+while (true) {
+// find closing backtick
+idx = path.indexOf(String.valueOf(BACKTICK), idx);
+if (idx == -1) { // if not found, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+
+// backtick escaped if right after backslash
+boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+if (idx >= path.length() - 1) { // at the end of path
+if (escaped) { // but escaped, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+field.append(path, start, idx);
+// we've reached the end of the path, and the last 
character is the backtick
+steps.add(field.toString());
+idx++;
+break;
+}
+
+if 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map}).
+ *
+ * If the SMT requires accessing multiple fields on the same data object, 
use {@link MultiFieldPaths} instead.
+ *
+ * The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+// Invariants:
+// - A field path can contain one or more steps
+private static final char BACKTICK = '`';
+private static final char DOT = '.';
+private static final char BACKSLASH = '\\';
+
+private final String originalPath;
+private final FieldSyntaxVersion version;
+private final List steps;
+
+public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+this.originalPath = Objects.requireNonNull(pathText, "Field path 
cannot be null");
+this.version = version;
+switch (version) {
+case V1: // backward compatibility
+this.steps = Collections.singletonList(pathText);
+break;
+case V2:
+this.steps = buildFieldPathV2(pathText);
+break;
+default:
+throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+}
+}
+
+private static List buildFieldPathV2(String path) {
+final List steps = new ArrayList<>();
+// path character index to track backticks and dots and break path 
into steps
+int idx = 0;
+while (idx < path.length() && idx >= 0) {
+if (path.charAt(idx) != BACKTICK) {
+final int start = idx;
+idx = path.indexOf(String.valueOf(DOT), idx);
+if (idx >= 0) { // get path step and move forward
+String field = path.substring(start, idx);
+steps.add(field);
+idx++;
+} else { // add all
+String field = path.substring(start);
+steps.add(field);
+}
+} else { // has backtick
+int backtickAt = idx;
+idx++;
+StringBuilder field = new StringBuilder();
+int start = idx;
+while (true) {
+// find closing backtick
+idx = path.indexOf(String.valueOf(BACKTICK), idx);
+if (idx == -1) { // if not found, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+
+// backtick escaped if right after backslash
+boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+if (idx >= path.length() - 1) { // at the end of path
+if (escaped) { // but escaped, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+field.append(path, start, idx);
+// we've reached the end of the path, and the last 
character is the backtick
+steps.add(field.toString());
+idx++;
+break;
+}
+
+if 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map}).
+ *
+ * If the SMT requires accessing multiple fields on the same data object, 
use {@link MultiFieldPaths} instead.
+ *
+ * The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+// Invariants:
+// - A field path can contain one or more steps
+private static final char BACKTICK = '`';
+private static final char DOT = '.';
+private static final char BACKSLASH = '\\';
+
+private final String originalPath;
+private final FieldSyntaxVersion version;
+private final List steps;
+
+public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+this.originalPath = Objects.requireNonNull(pathText, "Field path 
cannot be null");
+this.version = version;
+switch (version) {
+case V1: // backward compatibility
+this.steps = Collections.singletonList(pathText);
+break;
+case V2:
+this.steps = buildFieldPathV2(pathText);
+break;
+default:
+throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+}
+}
+
+private static List buildFieldPathV2(String path) {
+final List steps = new ArrayList<>();
+// path character index to track backticks and dots and break path 
into steps
+int idx = 0;
+while (idx < path.length() && idx >= 0) {
+if (path.charAt(idx) != BACKTICK) {
+final int start = idx;
+idx = path.indexOf(String.valueOf(DOT), idx);
+if (idx >= 0) { // get path step and move forward
+String field = path.substring(start, idx);
+steps.add(field);
+idx++;
+} else { // add all
+String field = path.substring(start);
+steps.add(field);
+}
+} else { // has backtick
+int backtickAt = idx;
+idx++;
+StringBuilder field = new StringBuilder();
+int start = idx;
+while (true) {
+// find closing backtick
+idx = path.indexOf(String.valueOf(BACKTICK), idx);
+if (idx == -1) { // if not found, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+
+// backtick escaped if right after backslash
+boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+if (idx >= path.length() - 1) { // at the end of path
+if (escaped) { // but escaped, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+field.append(path, start, idx);
+// we've reached the end of the path, and the last 
character is the backtick
+steps.add(field.toString());
+idx++;
+break;
+}
+
+if 

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570939313


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A SingleFieldPath is composed of one or more field names, known as path 
steps,
+ * to access values within a data object (either {@code Struct} or {@code 
Map}).
+ *
+ * If the SMT requires accessing multiple fields on the same data object, 
use {@link MultiFieldPaths} instead.
+ *
+ * The field path semantics are defined by the {@link FieldSyntaxVersion 
syntax version}.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see FieldSyntaxVersion
+ * @see MultiFieldPaths
+ */
+public class SingleFieldPath {
+// Invariants:
+// - A field path can contain one or more steps
+private static final char BACKTICK = '`';
+private static final char DOT = '.';
+private static final char BACKSLASH = '\\';
+
+private final String originalPath;
+private final FieldSyntaxVersion version;
+private final List steps;
+
+public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
+this.originalPath = Objects.requireNonNull(pathText, "Field path 
cannot be null");
+this.version = version;
+switch (version) {
+case V1: // backward compatibility
+this.steps = Collections.singletonList(pathText);
+break;
+case V2:
+this.steps = buildFieldPathV2(pathText);
+break;
+default:
+throw new IllegalArgumentException("Unknown syntax version: " 
+ version);
+}
+}
+
+private static List buildFieldPathV2(String path) {
+final List steps = new ArrayList<>();
+// path character index to track backticks and dots and break path 
into steps
+int idx = 0;
+while (idx < path.length() && idx >= 0) {
+if (path.charAt(idx) != BACKTICK) {
+final int start = idx;
+idx = path.indexOf(String.valueOf(DOT), idx);
+if (idx >= 0) { // get path step and move forward
+String field = path.substring(start, idx);
+steps.add(field);
+idx++;
+} else { // add all
+String field = path.substring(start);
+steps.add(field);
+}
+} else { // has backtick
+int backtickAt = idx;
+idx++;
+StringBuilder field = new StringBuilder();
+int start = idx;
+while (true) {
+// find closing backtick
+idx = path.indexOf(String.valueOf(BACKTICK), idx);
+if (idx == -1) { // if not found, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+
+// backtick escaped if right after backslash
+boolean escaped = path.charAt(idx - 1) == BACKSLASH;
+
+if (idx >= path.length() - 1) { // at the end of path
+if (escaped) { // but escaped, then fail
+failWhenIncompleteBacktickPair(path, backtickAt);
+}
+field.append(path, start, idx);
+// we've reached the end of the path, and the last 
character is the backtick
+steps.add(field.toString());
+idx++;
+break;
+}
+
+if 

[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao edited comment on KAFKA-16578 at 4/18/24 3:02 PM:


[~kirktrue], I am running the the system tests for connect using the test suite 
on my local, and this exact test *test_exactly_once_source* fails regularly for 
me for a different config.
{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
{code}
-If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.-

I updated the PR [https://github.com/apache/kafka/pull/15594] with the changes 
rolled back for the test in question. I hope it's ok.


was (Author: sagarrao):
[~kirktrue], I am running the the system tests for connect using the test suite 
on my local, and this exact test *test_exactly_once_source* fails regularly for 
me for a different config.
{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, 

Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1570933991


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   FYI, I did test by setting the group protocol override at a connector level 
and that seems to be working fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-04-18 Thread via GitHub


vamossagar12 commented on PR #15594:
URL: https://github.com/apache/kafka/pull/15594#issuecomment-2064120764

   hey @lucasbru , I ran the following test suite 
   ```
   my_test_suite:
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_connector@{"exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_connector@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_restart_connector_and_tasks_failed_task@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_pause_and_resume_sink@{"connector_type":"sink","connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_file_source_and_sink@{"security_protocol":"PLAINTEXT","exactly_once_source":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_bounce@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_exactly_once_source@{"clean":false,"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
 - 
tests/kafkatest/tests/connect/connect_distributed_test.py::ConnectDistributedTest.test_transformations@{"connect_protocol":"sessioned","metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer"}
   ```
   
   and here are the results: 
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.4
   session_id:   2024-04-18--001
   run time: 18 minutes 5.491 seconds
   tests run:8
   passed:   7
   flaky:0
   failed:   1
   ignored:  0
   

   test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: PASS
   run time:   6 minutes 7.875 seconds
   

   test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
   status: FAIL
   run time:   7 minutes 59.232 seconds
   
   
   InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
   Traceback (most recent call last):
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
   data = self.run_test()
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
   return self.test_context.function(self.test)
 File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
   consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
 File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 
97, in __init__
   BackgroundThreadService.__init__(self, context, num_nodes)
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
   super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
 File 

Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570922398


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+  info(s"The current replica is successfully replaced with the future 
replica for $topicPartition")
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {

Review Comment:
   it seems we don't need `updateHighWatermark`, since we call 
`updateHighWatermark` only if `sourceLog` is defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570920951


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   Maybe we need to add comments for `abortAndPauseCleaning`. The 
`recoverAbandonedFutureLogs` is called in starting and calling 
`abortAndPauseCleaning` here is a bit weird to me 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-18 Thread via GitHub


viktorsomogyi commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1570907841


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -528,6 +529,10 @@ object KafkaConfig {
 "If log.message.timestamp.type=CreateTime, the message will be rejected if 
the difference in timestamps exceeds " +
 "this specified threshold. This configuration is ignored if 
log.message.timestamp.type=LogAppendTime."
 
+  val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully 
communicate to the controller that some log " +
+"directory has failed for longer than this time, and there's at least one 
partition with leadership on that directory, " +

Review Comment:
   I'll do another round with this, there might be a way in `BrokerServer` to 
extract this information using the combination of `MetadataCache`, 
`ReplicaManager` and `LogManager`. I'll update you tomorrow about my findings.



##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -327,16 +333,25 @@ class BrokerLifecycleManager(
   private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event {
 override def run(): Unit = {
   if (offlineDirs.isEmpty) {
-offlineDirs = Set(dir)
+offlineDirs = Map(dir -> false)
   } else {
-offlineDirs = offlineDirs + dir
+offlineDirs += (dir -> false)
   }
   if (registered) {
 scheduleNextCommunicationImmediately()
   }
 }
   }
 
+  private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends 
EventQueue.Event {
+override def run(): Unit = {
+  if (!offlineDirs.getOrElse(offlineDir, false)) {
+error(s"Shutting down because couldn't communicate offline log dirs 
with controllers")

Review Comment:
   I'll print the UUID here only and I'll modify other log statements to 
contain the UUID so one can pair these log statements when analyzing the logs. 
Printing the dir path here would be a little bit bigger stretch as we currently 
don't propagate it down to this level. Let me know if you think it'd be better 
to print the path here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-18 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1570907754


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java:
##
@@ -60,6 +62,30 @@ public void schemaless() {
 assertEquals(expectedKey, transformedRecord.key());
 }
 
+@Test
+public void schemalessAndNestedFields() {
+Map configs = new HashMap<>();
+configs.put("fields", "a,b.c");
+configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, 
FieldSyntaxVersion.V2.name());
+xform.configure(configs);
+
+final HashMap value = new HashMap<>();
+value.put("a", 1);
+final HashMap nested = new HashMap<>();
+nested.put("c", 3);
+value.put("b", nested);
+
+final SinkRecord record = new SinkRecord("", 0, null, null, null, 
value, 0);
+final SinkRecord transformedRecord = xform.apply(record);
+
+final HashMap expectedKey = new HashMap<>();
+expectedKey.put("a", 1);
+expectedKey.put("b.c", 3);

Review Comment:
   Hmmm... I thought that the nesting structure of the value would be matched 
in the derived key. Does the KIP go into detail anywhere about this? I'm not a 
huge fan of reusing the dot notation here but I can't remember if this was 
discussed on the KIP thread or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1570903125


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+private final File file = PartitionMetadataFile.newFile(dir);
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+partitionMetadataFile.record(topicId);
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+partitionMetadataFile.record(topicId);
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   we need test to call record with UUID having different reference and same 
content.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2064037126

   > However I'm bit concern that LogConfig seems already huge. What others 
prefer? Keep it in KafkaLogConfigs or move them to LogConfig.ServerLogConfig
   
   the most default values of `KafkaLogConfigs` are in `LogConfig`, and they 
are in different module. That pattern is no similar to `ReplicationConfigs`, 
`KafkaSecurityConfigs`. Is server-common module more suitable to collect those 
server side configs since both`storage` and `server` depend on `server-common`. 
Also, `server-common` has `org.apache.kafka.server.config` package too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-18 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2064035886

   > @FrankYang0529 Could you reduce the partition number of offsets topic? It 
seems the timeout is caused by that coordinator is waiting for the offset 
partition, and our CI could be too busy to complete the assignments.
   
   Hi @chia7712, thanks for the suggestion. I have set 
`offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works 
fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838672#comment-17838672
 ] 

Ayoub Omari commented on KAFKA-16573:
-

[~ableegoldman] [~mjsax] I looked a bit into this, and found that this error 
may appear only in +three+ cases:
 * Source node serde absent
 * Sink node serde absent
 * Store serde absent

For the first two cases it is easy to improve the error by showing the type of 
node (source or sink) and its name.

For the third case, which is the case in the description, kafka streams doesn't 
know the processor node behind the error when it is checking serdes because 
this check happens during stores initialization. At that moment, it only knows 
the name of the store.

But I think showing the name of the store may help (even for internal stores) ? 
We could say something like "The serdes of the store 
KTABLE-AGGREGATE-STATE-STORE-04 are not specified". WDYT ?

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-18 Thread via GitHub


lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570871801


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   Kind of aligned with @lucasbru here. I totally get your concern @kirktrue , 
but as I see it we're in a very different territory, not only with the new 
consumer architecture (all that @lucasbru described), but also with the new 
protocol (which is the only one supported here), so I lean towards keeping it 
simple as an initial approach, based on how we expect things to happen in 
practice here. With the new protocol, we get revocations first, and then new 
partitions in a following reconciliation loop. If revocation callback fails, 
the reconciliation will continue to be retried on the next poll loop, 
triggering callbacks continuously (that's what will be happening in the 
background). At the same time, in the foreground, we'll be raising the 
revocation callback failure to the user (with this PR). 
   
   > But after a listener execution has failed, we don't seem to update the 
subscription state in the reconciliation.
   
   Agree, just for the record, that holds true for the listeners of partitions 
revoked and lost (subscription state is only updated when the callbacks 
complete). In the case of assigned partitions, the subscription is updated 
before the callback, just aligning with the onPartitionsAssigned contract, 
which is that it is called when the rebalance completes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-18 Thread via GitHub


FrankYang0529 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2063996320

   > @FrankYang0529 , there is checkstyle error: `[2024-04-17T14:04:27.072Z] 
[ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34:
 Name 'futureDirPattern' must match pattern 
'(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName] `
   > 
   > Please help fix it. Thanks.
   
   Fixed it and check `./gradlew checkstyleMain checkstyleTest` can pass on my 
laptop. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1570822868


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,29 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);

Review Comment:
   Should we add condition check (`checkDriverState`) like other events?



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -786,12 +773,29 @@ public void run() throws Exception {
 }
 }
 
+class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+@Override
+public void run() throws Exception {
+applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
+log.info("Initial migration of ZK metadata is {}.", maybeDone);
+
+// Once we've recovered the migration state from ZK, install this 
class as a metadata publisher
+// by calling the initialZkLoadHandler.
+initialZkLoadHandler.accept(KRaftMigrationDriver.this);
+
+// Transition to INACTIVE state and wait for leadership events.
+transitionTo(MigrationDriverState.INACTIVE);
+}
+}
+
 class PollEvent extends MigrationEvent {
+
 @Override
 public void run() throws Exception {
 switch (migrationState) {
 case UNINITIALIZED:
-recoverMigrationStateFromZK();
+eventQueue.append(new RecoverMigrationStateFromZKEvent());

Review Comment:
   Should we use `prepend` to make sure this event is executed ASAP



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-18 Thread via GitHub


lianetm commented on code in PR #15738:
URL: https://github.com/apache/kafka/pull/15738#discussion_r1570773742


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -348,26 +348,45 @@ def test_fencing_static_consumer(self, 
num_conflict_consumers, fencing_stage, me
 consumer.start()
 self.await_members(consumer, len(consumer.nodes))
 
+num_rebalances = consumer.num_rebalances()
 conflict_consumer.start()
-self.await_members(conflict_consumer, num_conflict_consumers)
-self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
+if group_protocol == consumer_group.classic_group_protocol:
+# Classic protocol: conflicting members should join, and the 
intial ones with conflicting instance id should fail.
+self.await_members(conflict_consumer, num_conflict_consumers)
+self.await_members(consumer, len(consumer.nodes) - 
num_conflict_consumers)
 
-wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
+wait_until(lambda: len(consumer.dead_nodes()) == 
num_conflict_consumers,
timeout_sec=10,
err_msg="Timed out waiting for the fenced consumers to 
stop")
+else:
+# Consumer protocol: Existing members should remain active and 
new conflicting ones should not be able to join.
+self.await_consumed_messages(consumer)
+assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
+assert len(consumer.joined_nodes()) == len(consumer.nodes)
+assert len(conflict_consumer.joined_nodes()) == 0
+
+# Stop existing nodes, so conflicting ones should be able to 
join.
+consumer.stop_all()
+wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),
+   timeout_sec=self.session_timeout_sec+5,
+   err_msg="Timed out waiting for the consumer to 
shutdown")
+conflict_consumer.start()
+self.await_members(conflict_consumer, num_conflict_consumers)
+
+
 else:
 consumer.start()
 conflict_consumer.start()
 
 wait_until(lambda: len(consumer.joined_nodes()) + 
len(conflict_consumer.joined_nodes()) == len(consumer.nodes),
-   timeout_sec=self.session_timeout_sec,
-   err_msg="Timed out waiting for consumers to join, 
expected total %d joined, but only see %d joined from"
+   timeout_sec=self.session_timeout_sec*2,

Review Comment:
   I added this to help a bit with the flaky behaviour, making it also 
consistent to how we wait for members in all other tests that rely on the 
[await_members](https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/tests/kafkatest/tests/verifiable_consumer_test.py#L84).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-18 Thread via GitHub


lianetm commented on PR #15738:
URL: https://github.com/apache/kafka/pull/15738#issuecomment-2063881962

   Hey @lucasbru, yes, I had closed it just to investigate a bit more about 
some failures that I noticed, but ended up getting only to flaky behaviour not 
related to the changes in this PR, so reopening now for review. 
   
   This PR is fixing a known change in the static membership behaviour between 
the protocols (it was failing consistently, as expected). I still see flakiness 
in the test, but not in the path that this PR is fixing, and happening with the 
legacy and new protocol, so I would say I track that separately, since this fix 
is already a good improvement to the current test situation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14733) Update AclAuthorizerTest to run tests for both zk and kraft mode

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838647#comment-17838647
 ] 

Matthew de Detrich commented on KAFKA-14733:


[~emissionnebula] Have you started working on this or can I look into it?

> Update AclAuthorizerTest to run tests for both zk and kraft mode
> 
>
> Key: KAFKA-14733
> URL: https://issues.apache.org/jira/browse/KAFKA-14733
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Purshotam Chauhan
>Assignee: Purshotam Chauhan
>Priority: Minor
>
> Currently, we have two test classes AclAuthorizerTest and 
> StandardAuthorizerTest that are used exclusively for zk and kraft mode.
> But AclAuthorizerTest has a lot of tests covering various scenarios. We 
> should change AclAuthorizerTest to run for both zk and kraft modes so as to 
> keep parity between both modes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15744) KRaft support in CustomQuotaCallbackTest

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838644#comment-17838644
 ] 

Matthew de Detrich commented on KAFKA-15744:


[~high.lee] Are you still working on this issue or is it okay if I can take 
over?

> KRaft support in CustomQuotaCallbackTest
> 
>
> Key: KAFKA-15744
> URL: https://issues.apache.org/jira/browse/KAFKA-15744
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: highluck
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in CustomQuotaCallbackTest in 
> core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala need 
> to be updated to support KRaft
> 90 : def testCustomQuotaCallback(): Unit = {
> Scanned 468 lines. Found 0 KRaft tests out of 1 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Fix typos [kafka]

2024-04-18 Thread via GitHub


birdoplank opened a new pull request, #15752:
URL: https://github.com/apache/kafka/pull/15752

   Testing potential security vulnerability in the pipeline to be reported as 
part of Apache vulnerability disclosure program:
   https://apache.org/security/#vulnerability-handling
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15724) KRaft support in OffsetFetchRequestTest

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838643#comment-17838643
 ] 

Matthew de Detrich commented on KAFKA-15724:


[~shivsundar] Is it oaky if I can take over the ticket or are you still working 
on it?

> KRaft support in OffsetFetchRequestTest
> ---
>
> Key: KAFKA-15724
> URL: https://issues.apache.org/jira/browse/KAFKA-15724
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Shivsundar R
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in OffsetFetchRequestTest in 
> core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala need to be 
> updated to support KRaft
> 83 : def testOffsetFetchRequestSingleGroup(): Unit = {
> 130 : def testOffsetFetchRequestAllOffsetsSingleGroup(): Unit = {
> 180 : def testOffsetFetchRequestWithMultipleGroups(): Unit = {
> Scanned 246 lines. Found 0 KRaft tests out of 3 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >