Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage [kafka]

2024-05-30 Thread via GitHub


abhijeetk88 commented on code in PR #16071:
URL: https://github.com/apache/kafka/pull/16071#discussion_r1621675519


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -6597,6 +6597,79 @@ class ReplicaManagerTest {
   ))
 }
   }
+
+  @Test
+  def testRemoteReadQuotaExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(true)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
fetchInfo.fetchOffsetMetadata)
+assertFalse(fetchInfo.records.records().iterator().hasNext)
+assertFalse(fetchInfo.firstEntryIncomplete)
+assertFalse(fetchInfo.abortedTransactions.isPresent)
+assertFalse(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  @Test
+  def testRemoteReadQuotaNotExceeded(): Unit = {
+when(mockRemoteLogManager.isRemoteLogFetchQuotaExceeded).thenReturn(false)
+
+val tp0 = new TopicPartition(topic, 0)
+val tpId0 = new TopicIdPartition(topicId, tp0)
+val fetch: Seq[(TopicIdPartition, LogReadResult)] = 
readFromLogWithOffsetOutOfRange(tp0)
+
+assertEquals(1, fetch.size)
+assertEquals(tpId0, fetch.head._1)
+val fetchInfo = fetch.head._2.info
+assertEquals(1L, fetchInfo.fetchOffsetMetadata.messageOffset)
+assertEquals(UnifiedLog.UnknownOffset, 
fetchInfo.fetchOffsetMetadata.segmentBaseOffset)
+assertEquals(-1, fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+assertEquals(MemoryRecords.EMPTY, fetchInfo.records)
+assertTrue(fetchInfo.delayedRemoteStorageFetch.isPresent)
+  }
+
+  private def readFromLogWithOffsetOutOfRange(tp: TopicPartition): 
Seq[(TopicIdPartition, LogReadResult)] = {
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true)
+try {
+  val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
+  replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
+  val partition0Replicas = Seq[Integer](0, 1).asJava
+  val topicIds = Map(tp.topic -> topicId).asJava
+  val leaderEpoch = 0
+  val leaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+Seq(
+  new LeaderAndIsrPartitionState()
+.setTopicName(tp.topic)
+.setPartitionIndex(tp.partition)
+.setControllerEpoch(0)
+.setLeader(leaderEpoch)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


abhijeetk88 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2141192632

   Thanks @chia7712 @jolshan . Apologies for the miss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ignore [kafka]

2024-05-30 Thread via GitHub


github-actions[bot] commented on PR #15355:
URL: https://github.com/apache/kafka/pull/15355#issuecomment-2141172395

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   I think checkstyle should consistent with auto format . If you open the A 
module auto format, we should open the module check rule.
   `ImportOrder` rule can't custom in each module(It's going to take a lot of 
changes,maybe should add `build.gradle` every module).
   So i add this line in order that open rule some module in future,This is 
what I think is a more convenient way to modify by module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   I think checkstyle should consistent with auto format . If you open the A 
module auto format, we should open the module check rule.
   `ImportOrder` rule can't custom in each module(It's going to take a lot of 
changes,maybe should add build.gradle every module).
   So i add this line in order that open rule some module in future,This is 
what I think is a more convenient way to modify by module



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621614857


##
build.gradle:
##
@@ -787,6 +800,12 @@ subprojects {
 skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
 skipConfigurations = [ "zinc" ]
   }
+
+  afterEvaluate {

Review Comment:
   I change the PR,please review it @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16861) Don't convert to group to classic if the size is larger than group max size

2024-05-30 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850899#comment-17850899
 ] 

TengYao Chi commented on KAFKA-16861:
-

I will handle this issue :)

> Don't convert to group to classic if the size is larger than group max size
> ---
>
> Key: KAFKA-16861
> URL: https://issues.apache.org/jira/browse/KAFKA-16861
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> It should be one-line fix [0]
> [0] 
> https://github.com/apache/kafka/blob/2d9994e0de915037525f041ff9a9b9325f838938/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L810



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621587957


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java:
##
@@ -585,34 +586,34 @@ public void 
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
 ));
 
 // Initial subscriptions were [T1, T2]
-Map members = new HashMap<>();
+Map members = new TreeMap<>();
+Map>> assignedPartitions = new 
HashMap<>();
 
 Map> currentAssignmentForA = mkAssignment(
 mkTopicAssignment(topic1Uuid, 0, 2),
 mkTopicAssignment(topic2Uuid, 1, 3)
 );
-members.put(memberA, new AssignmentMemberSpec(
+assignedPartitions.put(memberA, currentAssignmentForA);
+members.put(memberA, new MemberSubscriptionSpecImpl(
 Optional.empty(),
-Optional.empty(),
-Collections.singleton(topic1Uuid),
-currentAssignmentForA
+Collections.singleton(topic1Uuid)
 ));
 
 Map> currentAssignmentForB = mkAssignment(
 mkTopicAssignment(topic1Uuid, 1),
 mkTopicAssignment(topic2Uuid, 0, 2, 4)
 );
-members.put(memberB, new AssignmentMemberSpec(
-Optional.empty(),
+assignedPartitions.put(memberB, currentAssignmentForB);
+members.put(memberB, new MemberSubscriptionSpecImpl(
 Optional.empty(),
-mkSet(topic1Uuid, topic2Uuid),
-currentAssignmentForB
+new HashSet<>(Arrays.asList(topic1Uuid, topic2Uuid))
 ));
 
 GroupSpec groupSpec = new GroupSpecImpl(
 members,
 HETEROGENEOUS,
-invertedTargetAssignment(members)
+assignedPartitions,
+invertedTargetAssignment(assignedPartitions, members)
 );
 SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
 

Review Comment:
   yep sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621586240


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   I think checkstyle should consistent with auto format . If you open the A 
module auto format, we should open the module check rule.
   So i add this line in order that open rule some module in future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621584578


##
build.gradle:
##
@@ -47,7 +47,7 @@ plugins {
   // Updating the shadow plugin version to 8.1.1 causes issue with signing and 
publishing the shadowed
   // artifacts - see https://github.com/johnrengelman/shadow/issues/901
   id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
-  id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer 
require Java 11 at compile time, so we can't upgrade until AK 4.0
+  id 'com.diffplug.spotless' version "${spotlessVersion}" apply false

Review Comment:
   i write in `gradle.properties` comment, because after 6.13.0 require Java 11 
, we should compatibility java 8
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-05-30 Thread via GitHub


satishd commented on PR #15820:
URL: https://github.com/apache/kafka/pull/15820#issuecomment-2141069773

   @abhijeetk88 Can you resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]

2024-05-30 Thread via GitHub


satishd merged PR #16146:
URL: https://github.com/apache/kafka/pull/16146


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]

2024-05-30 Thread via GitHub


satishd commented on PR #16146:
URL: https://github.com/apache/kafka/pull/16146#issuecomment-2141065172

   A few failing tests are unrelated to the change. Merging this change to the 
trunk to unblock the test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+// Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   Yes. For the fetch request for example, there is code to make sure that all 
topics have IDs before we can send the fetch request. This is a bit less of an 
issue now, but if we have a cluster that is running on a MV < 2.8, topics will 
not have IDs. So when we decide which version of produce we want to send, we 
want to be aware of this.
   
   Not only that, but even if the broker supports topic IDs on all topics, we 
also may have a case where we need to do a rolling upgrade to get the code that 
supports handling the latest API version. This may be less complicated for 
Produce since it is a client only API and doesn't rely on MV/IBP, so the 
apiVersions exchange between the client and the broker may be enough to ensure 
api compatibility. 
   
   We just want to confirm these upgrade paths are compatible since produce is 
the hot path and we don't want any (or at least not extended) downtime in the 
middle of an upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10551: Add topic id support to produce request and response [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1621535872


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+// Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   Yes. For the fetch request for example, there is code to make sure that all 
topics have IDs before we can send the fetch request. This is a bit less of an 
issue now, but if we have a cluster that is running on a MV < 2.8, not all 
topics will have IDs. So when we decide which version of produce we want to 
send, we want to be aware of this.
   
   Not only that, but even if the broker supports topic IDs on all topics, we 
also may have a case where we need to do a rolling upgrade to get the code that 
supports handling the latest API version. This may be less complicated for 
Produce since it is a client only API and doesn't rely on MV/IBP, so the 
apiVersions exchange between the client and the broker may be enough to ensure 
api compatibility. 
   
   We just want to confirm these upgrade paths are compatible since produce is 
the hot path and we don't want any (or at least not extended) downtime in the 
middle of an upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1621532246


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
traffic cost configuration was not set.");

Review Comment:
   We should log the exact config name since otherwise people won't necessarily 
know what this is referring to (especially since they already forgot to set 
this config). 
   
   ```suggestion
   LOG.warn("Rack aware task assignment optimization unavailable: 
must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
traffic cost configuration was not set.");
 return false;
 }
+
+if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
non-overlap cost configuration was not set.");

Review Comment:
   ```suggestion
   LOG.warn("Rack aware task assignment optimization unavailable: 
must configure {}", 
StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -40,8 +41,8 @@ public AssignmentConfigs(final StreamsConfig configs) {
 configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
 configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
+
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)),
+
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)),

Review Comment:
   don't we need to check `if 
(assignorClassName.equals("org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor"))`
 and set these to the sticky assignor defaults if true?
   
   Where `assignorClassName` is equal to 
`streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG)` -- I guess 
maybe we do want the public  `AssignmentConfigs` constructor to take in the 
StreamsConfig after all?



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
  

Re: [PR] MINOR: Add more unit tests to LogSegments [kafka]

2024-05-30 Thread via GitHub


showuon commented on code in PR #16085:
URL: https://github.com/apache/kafka/pull/16085#discussion_r1621524702


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -4251,6 +4251,22 @@ class UnifiedLogTest {
 assertEquals(new LogOffsetMetadata(14, -1L, -1), 
log.maybeConvertToOffsetMetadata(14))
   }
 
+  @Test
+  def testGetFirstBatchTimestampForSegments(): Unit = {
+val log = createLog(logDir, LogTestUtils.createLogConfig())
+
+val segments: java.util.List[LogSegment] = new 
java.util.ArrayList[LogSegment]()
+val seg1 = LogTestUtils.createSegment(1, logDir, 10, Time.SYSTEM)
+val seg2 = LogTestUtils.createSegment(2, logDir, 10, Time.SYSTEM)

Review Comment:
   Forgot to close them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621518800


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java:
##
@@ -18,48 +18,60 @@
 
 import org.apache.kafka.common.Uuid;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 
 /**
  * The assignment specification for a consumer group.
  */
 public class GroupSpecImpl implements GroupSpec {
 /**
- * The member metadata keyed by member Id.
+ * Member subscription metadata keyed by member Id.
  */
-private final Map members;
+private final Map memberSubscriptions;
 
 /**
- * The subscription type followed by the group.
+ * The subscription type of the group.
  */
 private final SubscriptionType subscriptionType;
 
+/**
+ * Partitions currently assigned to each member keyed by topicId.
+ */
+private final Map>> currentAssignment;

Review Comment:
   I used memberAssigment and invertedMemberAssignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621508931


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   discussed offline, we want to throw an IllegalStateException when the 
memberId is not found



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621497961


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Mostly confused because we don't check for the config in kafka apis anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621496853


##
metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java:
##
@@ -74,7 +74,7 @@ public void testDefaultFeatureMapWithUnstable() {
 for (Features feature : Features.PRODUCTION_FEATURES) {
 expectedFeatures.put(feature.featureName(), VersionRange.of(
 0,
-feature.defaultValue(MetadataVersion.LATEST_PRODUCTION)
+feature.defaultValue(MetadataVersion.latestTesting())

Review Comment:
   Hmm was this just a bug in the test...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-30 Thread via GitHub


pasharik commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1621488481


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   - I've moved KRaft tests into a new `AclCommandIntegrationTest.java`
   - Left old Zookeeper tests in `AclCommandTest.scala`. As I understand, we 
are going to completely delete this test file, once fully moved to KRaft, am I 
right? Do you think it's worth migrating those tests to java at this stage, if 
they are going to be deleted anyway?
   - Race condition described above, is still reproduced on a new 
infrastructure :cry: So if there are no objections, we can probably remove 
console output `Current ACLs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621487824


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   do we plan to remove this config value from GroupCoordinatorConfig? I see it 
was removed from a lot of files, but there are still a few where it is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-30 Thread via GitHub


soarez commented on PR #15945:
URL: https://github.com/apache/kafka/pull/15945#issuecomment-2140953114

   There are some conflicts that need addressing, and the JDK 21 pipeline 
didn't run. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-30 Thread via GitHub


soarez commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1621485662


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {
+if (log.isTraceEnabled()) {
+log.trace("Metadata version change is present: {}",
+delta.metadataVersionChanged());
+}
+checkBrokerRegistration = true;
+}
+}
+if (delta.clusterDelta() != null) {
+if (delta.clusterDelta().changedBrokers().get(id) != null) {
+if (log.isTraceEnabled()) {
+log.trace("Broker change is present: {}",
+delta.clusterDelta().changedBrokers().get(id));
+}
+checkBrokerRegistration = true;
+}
+}
+if (checkBrokerRegistration) {
+if 
(brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+delta.clusterDelta().broker(id))) {
+refreshRegistrationCallback.run();
+}
+}
+}
+
+/**
+ * Check if the current broker registration needs to be refreshed.
+ *
+ * @param registration  The current broker registration, or null if there 
is none.
+ * @return  True only if we should refresh.
+ */
+boolean brokerRegistrationNeedsRefresh(
+MetadataVersion metadataVersion,
+BrokerRegistration registration
+) {
+// If there is no existing registration, the BrokerLifecycleManager 
must still be sending it.
+// So we don't 

Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]

2024-05-30 Thread via GitHub


apourchet commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1621479244


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -244,18 +271,112 @@ public static Map 
optimizeRackAwareStandbyTas
 );
 LOG.info("Assignment before standby task optimization has cost {}", 
initialCost);
 
-throw new UnsupportedOperationException("Not yet Implemented.");
+final MoveStandbyTaskPredicate moveablePredicate = 
getStandbyTaskMovePredicate(applicationState);
+final BiFunction> getMovableTasks = (source, destination) -> {
+return source.tasks().values().stream()
+.filter(task -> task.type() == AssignedTask.Type.STANDBY)
+.filter(task -> !destination.tasks().containsKey(task.id()))
+.filter(task -> {
+final KafkaStreamsState sourceState = 
kafkaStreamsStates.get(source.processId());
+final KafkaStreamsState destinationState = 
kafkaStreamsStates.get(source.processId());
+return moveablePredicate.canMoveStandbyTask(sourceState, 
destinationState, task.id(), kafkaStreamsAssignments);
+})
+.map(AssignedTask::id)
+.sorted()
+.collect(Collectors.toList());
+};
+
+final long startTime = System.currentTimeMillis();
+boolean taskMoved = true;
+int round = 0;
+final RackAwareGraphConstructor 
graphConstructor = RackAwareGraphConstructorFactory.create(
+
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
+while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
+taskMoved = false;
+round++;
+for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
+final UUID clientId1 = clientIds.get(i);
+final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
+final UUID clientId2 = clientIds.get(i);
+final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+
+final String rack1 = 
clientRacks.get(clientState1.processId().id()).get();
+final String rack2 = 
clientRacks.get(clientState2.processId().id()).get();
+// Cross rack traffic can not be reduced if racks are the 
same
+if (rack1.equals(rack2)) {
+continue;
+}
+
+final List movable1 = 
getMovableTasks.apply(clientState1, clientState2);
+final List movable2 = 
getMovableTasks.apply(clientState2, clientState1);
+
+// There's no needed to optimize if one is empty because 
the optimization
+// can only swap tasks to keep the client's load balanced
+if (movable1.isEmpty() || movable2.isEmpty()) {
+continue;
+}
+
+final List taskIdList = 
Stream.concat(movable1.stream(), movable2.stream())
+.sorted()
+.collect(Collectors.toList());
+final List clients = Stream.of(clientId1, 
clientId2).sorted().collect(Collectors.toList());
+
+final AssignmentGraph assignmentGraph = buildTaskGraph(
+assignmentsByUuid,
+clientRacks,
+taskIdList,
+clients,
+topicPartitionsByTaskId,
+crossRackTrafficCost,
+nonOverlapCost,
+false,
+false,

Review Comment:
   you're right, good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-30 Thread via GitHub


apourchet opened a new pull request, #16147:
URL: https://github.com/apache/kafka/pull/16147

   This PR takes care of making the call back 
to`TaskAssignor.onAssignmentComputed`.
   
   It also contains a change to the public AssignmentConfigs API, as well as 
some simplifications of the StickyTaskAssignor.
   
   This PR also changes the rack information fetching to happen lazily in the 
case where the TaskAssignor makes its decisions without said rack information.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization [kafka]

2024-05-30 Thread via GitHub


ableegoldman merged PR #16129:
URL: https://github.com/apache/kafka/pull/16129


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on PR #16129:
URL: https://github.com/apache/kafka/pull/16129#issuecomment-2140934597

   Test failures are unrelated. Merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on code in PR #16129:
URL: https://github.com/apache/kafka/pull/16129#discussion_r1621467009


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -555,18 +556,21 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 
 RackUtils.annotateTopicPartitionsWithRackInfo(cluster, 
internalTopicManager, allTopicPartitions);
 
-final Set logicalTasks = logicalTaskIds.stream().map(taskId 
-> {
-final Set stateStoreNames = topologyMetadata
-.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
-.keySet();
-final Set topicPartitions = 
topicPartitionsForTask.get(taskId);
-return new DefaultTaskInfo(
-taskId,
-!stateStoreNames.isEmpty(),
-stateStoreNames,
-topicPartitions
-);
-}).collect(Collectors.toSet());
+final Map logicalTasks = 
logicalTaskIds.stream().collect(Collectors.toMap(
+Function.identity(),
+taskId -> {
+final Set stateStoreNames = topologyMetadata
+
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())

Review Comment:
   Ah somehow I missed this before -- this is actually returning _all_ the 
state stores for this topology, it's not specific to the taskId. This was an 
existing issue so we don't need to fix it in this PR, it can be addressed in a 
followup. It might be a bit complicated so I'll take a look at how we can get 
this info
   
   Would've caught this during testing since we definitely want tests with 
mixed stateless-and-stateful tasks, but still good to fix ASAP



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -72,6 +80,27 @@ public static Map 
identityAssignment(final Ap
 return assignments;
 }
 
+/**
+ * Assign standby tasks to KafkaStreams clients according to the default 
logic.
+ * 
+ * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
+ *
+ * @param applicationStatethe metadata and other info describing 
the current application state
+ * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
+ *
+ * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default standby assignment
+ */
+public static Map 
defaultStandbyTaskAssignment(final ApplicationState applicationState,
+   
   final Map kafkaStreamsAssignments) {
+if 
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
+return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+} else if (canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.STANDBY)) {
+return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);

Review Comment:
   Address in a followup:
   
   We should just remove this case entirely right? basically it's "if 
hasRackAwareTags then do tag-based standby task assignment, if doesNotHaveTags 
then do default standby task assignment"
   
   Note that the tag-based rack aware assignment has nothing to do with the 
rack ids. So `canPerformRackAwareOptimization` is kind of irrelevant to the 
question here



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -407,4 +543,345 @@ private static boolean hasValidRackInformation(final 
TaskInfo task,
 }
 return true;
 }
+
+private static Map 
tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
+   
 final Map kafkaStreamsAssignments) {
+final int numStandbyReplicas = 
applicationState.assignmentConfigs().numStandbyReplicas();
+final Map tasksToRemainingStandbys = 
applicationState.allTasks().values().stream()
+.collect(Collectors.toMap(TaskInfo::id, taskInfo -> 
numStandbyReplicas));
+final Map streamStates = 
applicationState.kafkaStreamsStates(false);
+
+final Set rackAwareAssignmentTags = new 
HashSet<>(getRackAwareAssignmentTags(applicationState));
+final TagStatistics tagStatistics = new 
TagStatistics(applicationState);
+
+final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
+
+final Set statefulTaskIds = 
applicationState.allTasks().values().stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toSet());
+final Map clientsByUuid = 

[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-05-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850869#comment-17850869
 ] 

Matthias J. Sax commented on KAFKA-16863:
-

I think `traffic_cost` was on purpose... But I really don't feel strong about 
it at all.

In general, I am always in favor to cleanup stuff; we also just did KIP-1020.

Not sure if we should do a single KIP though. I can become very convoluted 
quickly. I would rather to multiple smaller KIPs?

Not sure what other issue there might be? Do you have a list?

> Consider removing `default.` prefix for exception handler config
> 
>
> Key: KAFKA-16863
> URL: https://issues.apache.org/jira/browse/KAFKA-16863
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Trivial
>  Labels: need-kip
>
> Kafka Streams has a set of configs with `default.` prefix. The intent for the 
> default-prefix is to make a distinction between, well the default, and 
> in-place overwrites in the code. Eg, users can specify ts-extractors on a 
> per-topic basis.
> However, for the deserialization- and production-exception handlers, no such 
> overwrites are possible, and thus, `default.` does not really make sense, 
> because there is just one handler overall. Via KIP-1033 we added a new 
> processing-exception handler w/o a default-prefix, too.
> Thus, we should consider to deprecate the two existing configs names and add 
> them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


junrao commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140919323

   @jolshan : Thanks for pointing this out. Sorry that I didn't look at the 
test results carefully before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan merged PR #16130:
URL: https://github.com/apache/kafka/pull/16130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16130:
URL: https://github.com/apache/kafka/pull/16130#discussion_r1621463208


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -449,8 +449,8 @@ object KafkaConfig {
   /** Internal Configurations **/
   // This indicates whether unreleased APIs should be advertised by this 
node.
   .defineInternal(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)
-  // This indicates whether unreleased MetadataVersions should be enabled 
on this node.
-  .defineInternal(ServerConfigs.UNSTABLE_METADATA_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)
+  // This indicates whether unreleased MetadataVersions or other feature 
versions should be enabled on this node.
+  .defineInternal(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG, 
BOOLEAN, false, HIGH)

Review Comment:
   Yup -- this is the text in the KIP:
   > Add INTERNAL configuration unstable.feature.versions.enable to allow for 
non production ready features to be used (for testing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on PR #16130:
URL: https://github.com/apache/kafka/pull/16130#issuecomment-2140914372

   I filed https://issues.apache.org/jira/browse/KAFKA-16866 for the one 
failure and that is getting fixed separately. 
   
   As for the others, looks like they are frequent flakes. I will go ahead and 
merge. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-30 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 5/30/24 9:41 PM:


These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFSETS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFESTS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-30 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 5/30/24 9:40 PM:


These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag: this is expecting {{poll()}} to call the {{LIST_OFFSETS}} 
RPC. The new consumer doesn't do this, but instead sends {{FETCH_OFFESTS}}
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16792) Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0)

2024-05-30 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850215#comment-17850215
 ] 

Kirk True edited comment on KAFKA-16792 at 5/30/24 9:34 PM:


These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords: uses the {{CLASSIC}} rebalance RPCs
 - testResetToCommittedOffset


was (Author: kirktrue):
These tests work with KAFKA-16200:
 - testResetUsingAutoResetPolicy
 - testFetchStableOffsetThrowInCommitted (wasn't in the above list)

The following still don't work:
 - testCurrentLag
 - testFetchStableOffsetThrowInPoll
 - testListOffsetShouldUpdateSubscriptions
 - testPollReturnsRecords
 - testResetToCommittedOffset

> Enable consumer unit tests that fail to fetch offsets only for new consumer 
> with poll(0)
> 
>
> Key: KAFKA-16792
> URL: https://issues.apache.org/jira/browse/KAFKA-16792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Enable the following unit tests for the new async consumer in 
> KafkaConsumerTest:
>  - testFetchStableOffsetThrowInPoll
>  - testCurrentLag
>  - testListOffsetShouldUpdateSubscriptions
>  - testResetToCommittedOffset
>  - testResetUsingAutoResetPolicy
>  - testPollReturnsRecords
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621445974


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621442335


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##
@@ -66,21 +66,19 @@ public GroupAssignment assign(
 GroupSpec groupSpec,
 SubscribedTopicDescriber subscribedTopicDescriber
 ) throws PartitionAssignorException {
-AbstractUniformAssignmentBuilder assignmentBuilder;
-
 if (groupSpec.members().isEmpty())
 return new GroupAssignment(Collections.emptyMap());
 
 if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
 LOG.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
 + "optimized assignment algorithm");
-assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+return new OptimizedUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber)
+.build();

Review Comment:
   any reason why we changed the name to not match the general assignor? Or is 
this also changed in the original that renamed the files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16858) Flatten SMT throws NPE

2024-05-30 Thread Adam Strickland (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Strickland updated KAFKA-16858:

Attachment: proto.proto

> Flatten SMT throws NPE
> --
>
> Key: KAFKA-16858
> URL: https://issues.apache.org/jira/browse/KAFKA-16858
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0
> Environment: Kafka 3.6 by way of CP 7.6.0
>Reporter: Adam Strickland
>Priority: Major
> Attachments: FlattenTest.java, proto.proto
>
>
> {{ConnectSchema.expectedClassesFor}} sometimes will throw an NPE as part of a 
> call to an SMT chain.  Stack trace snippet:
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.apply(MomentFlatten.java:84)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.applyWithSchema(MomentFlatten.java:173)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:274)}}
> {{at org.apache.kafka.connect.data.Struct.put(Struct.java:203)}}
> {{at org.apache.kafka.connect.data.Struct.put(Struct.java:216)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)}}
> (the above transform is a sub-class of 
> {{{}o.a.k.connect.transforms.Flatten{}}}; have confirmed that the error 
> occurs regardless).
> The field being transformed is an array of structs. If the call to 
> {{Schema#valueSchema()}} (o.a.k.connect.data.ConnectSchema.java:255) returns 
> {{{}null{}}}, the subsequent call to {{Schema#name()}} at 
> o.a.k.connect.data.ConnectSchema:268 throws an NPE.
> The strange thing that we have observed is that this doesn't always happen; 
> *sometimes* the struct's schema is found and sometimes it is not. We have 
> been unable to determine the root cause, but have constructed a test that 
> replicates the problem as observed (see attachment).
> In our case we have worked around the issue with the aforementioned sub-class 
> of {{{}Flatten{}}}, catching and logging the NPE on that specific use-case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16858) Flatten SMT throws NPE

2024-05-30 Thread Adam Strickland (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850866#comment-17850866
 ] 

Adam Strickland commented on KAFKA-16858:
-

* 
{quote}Are you able to provide an anonymized form of your schema directly, 
rather than just a high-level "Array of Structs"? I'm wondering if your schema 
is capable of triggering the use of the mutable SchemaWrapper 
[https://github.com/confluentinc/schema-registry/blob/7b886f309c83041d4f2a5b41b5910f3b8002413a/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufData.java#L1779]
 inside the ProtobufConverter.{quote}

Yes; see [^proto.proto]
 * 
{quote}w.r.t. the variable validateValue depth: Are you saying that in _error 
cases_ the recursion depth is unpredictable, or in general? The validateValue 
should be called at every or almost every location in the tree of values, so I 
would expect to see lots of different recursion depths. Maybe you can share 
some more stacktraces as examples.{quote}

We were only looking at error cases. I specifically recall seeing the same 
message break on the 3rd recursion and on the 7th; the array for the message in 
question contained 2 Structs.
 * 
{quote}So far in this investigation, I'm trying to find the source of the null 
in hopes that we can prevent it, and get well-formed data to the Flatten SMT. 
Regardless of the result of that investigation, I think we can consider this 
input malformed, and throw an intentional DataException instead of 
NullPointerException. Would that be an acceptable solution for you, or does 
this data need to make it all the way through the pipeline?{quote}

Throwing a DataException would be preferable to the NPE. I'm not sure I 
understand what you mean by "need to make it all the way through the 
pipeline"... As it stands the particular problematic attribute is not something 
we care about right now, which is why we can simply squash the Exception.

> Flatten SMT throws NPE
> --
>
> Key: KAFKA-16858
> URL: https://issues.apache.org/jira/browse/KAFKA-16858
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.6.0
> Environment: Kafka 3.6 by way of CP 7.6.0
>Reporter: Adam Strickland
>Priority: Major
> Attachments: FlattenTest.java, proto.proto
>
>
> {{ConnectSchema.expectedClassesFor}} sometimes will throw an NPE as part of a 
> call to an SMT chain.  Stack trace snippet:
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.apply(MomentFlatten.java:84)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.applyWithSchema(MomentFlatten.java:173)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:280)}}
> {{at 
> com.github.momenttechnology.kafka.connect.transforms.MomentFlatten.buildWithSchema(MomentFlatten.java:274)}}
> {{at org.apache.kafka.connect.data.Struct.put(Struct.java:203)}}
> {{at org.apache.kafka.connect.data.Struct.put(Struct.java:216)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:224)}}
> {{at 
> org.apache.kafka.connect.data.ConnectSchema.expectedClassesFor(ConnectSchema.java:268)}}
> (the above transform is a sub-class of 
> {{{}o.a.k.connect.transforms.Flatten{}}}; have confirmed that the error 
> occurs regardless).
> The field being transformed is an array of structs. If the call to 
> {{Schema#valueSchema()}} (o.a.k.connect.data.ConnectSchema.java:255) returns 
> {{{}null{}}}, the subsequent call to {{Schema#name()}} at 
> o.a.k.connect.data.ConnectSchema:268 throws an NPE.
> The strange thing that we have observed is that this doesn't always happen; 
> *sometimes* the struct's schema is found and sometimes it is not. We have 
> been unable to determine the root cause, but have constructed a test that 
> replicates the problem as observed (see attachment).
> In our case we have worked around the issue with the aforementioned sub-class 
> of {{{}Flatten{}}}, catching and logging the NPE on that specific use-case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector

2024-05-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850864#comment-17850864
 ] 

Edoardo Comar commented on KAFKA-16047:
---

[~gharris1727]

I see that for a common produce request,

ReplicaManager.appendRecords uses the timeout set as  
{color:#00}ProducerConfig{color}.{color:#871094}REQUEST_TIMEOUT_MS_CONFIG{color}

i.e. 
{color:#00}CommonClientConfigs.{color}{color:#871094}REQUEST_TIMEOUT_MS_CONFIG{color}

would it not make sense to always use that timeout when appending to a log ?


BTW, MirrorMaker2 is unusable in a connect cluster set up with exactly once, 
when the broker server.properties
are changed from the settings used in development testing and present in the 
config properties files.
{color:#ff}transaction.state.log.replication.factor{color}{color:#00}=1{color}
{color:#ff}transaction.state.log.min.isr{color}{color:#00}=1{color}
 
[~akaltsikis] I see that you could not progress your PR. Are you happy to hand 
over this bugfix?

> Source connector with EOS enabled have some InitProducerId requests timing 
> out, effectively failing all the tasks & the whole connector
> ---
>
> Key: KAFKA-16047
> URL: https://issues.apache.org/jira/browse/KAFKA-16047
> Project: Kafka
>  Issue Type: Bug
>  Components: connect, mirrormaker
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 
> 3.6.1
>Reporter: Angelos Kaltsikis
>Assignee: Edoardo Comar
>Priority: Major
>
> Source Connectors with 'exactly.once.support = required' may have some of 
> their tasks that issue InitProducerId requests from the admin client timeout. 
> In the case of MirrorSourceConnector, which was the source connector that i 
> found the bug, the bug was effectively making all the tasks (in the specific 
> case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 
> 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many 
> restarts i did to the connector/tasks, i couldn't get the 
> MirrorSourceConnector in a healthy RUNNING state again.
> Due to the low timeout that has been [hard-coded in the 
> code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
>  (1ms), there is a chance that the `InitProducerId` requests timeout in case 
> of "slower-than-expected" Kafka brokers (that do not process & respond to the 
> above request in <= 1ms). (feel free to read more information about the issue 
> in the "More Context" section below)
> [~ChrisEgerton] I would appreciate it if you could respond to the following 
> questions
> - How and why was the 1ms magic number for transaction timeout has to be 
> chosen?
> - Is there any specific reason that it can be guaranteed that the 
> `InitProducerId` request can be processed in such a small time window? 
> - I have tried the above in multiple different Kafka clusters that are hosted 
> in different underlying datacenter hosts and i don't believe that those 
> brokers are "slow" for some reason. If you feel that the brokers are slower 
> than expected, i would appreciate any pointers on how could i find out what 
> is the bottleneck
> h3. Temporary Mitigation
> I have increased the timeout to 1000ms (randomly picked this number, just 
> wanted to give enough time to brokers to always complete those type of 
> requests). It fix can be found in my fork 
> https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
>  
> h3. Final solution
> The temporary mitigation is not ideal, as it still randomly picks a timeout 
> for such an operation which may high enough but it's not ensured that it will 
> always be high enough. Shall we introduce something client configurable ?
> At the same time, i was thinking whether it makes sense to introduce some 
> tests that simulate slower than the "blazing" fast mocked brokers that exist 
> in Unit Tests, so as to be able to catch this type of low timeouts that 
> potentially make some software features not usable.
> h3. What is affected
> The above bug exists in MirrorSourceConnector Tasks running in distributed 
> Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode 
> enabled (pre-requisite for the exactly.once.support to work). I believe this 
> should be true for other SourceConnectors as well (as the code-path that was 
> the one to blame is Connect specific & not MirrorMaker specific).
> h3. More context & logs
> *Connector Logs*
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: 

[PR] KAFKA-16866 RemoteLogManagerTest.testCopyQuotaManagerConfig failing [kafka]

2024-05-30 Thread via GitHub


chia7712 opened a new pull request, #16146:
URL: https://github.com/apache/kafka/pull/16146

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16866) RemoteLogManagerTest.testCopyQuotaManagerConfig failing

2024-05-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16866:
--

Assignee: Chia-Ping Tsai

> RemoteLogManagerTest.testCopyQuotaManagerConfig failing
> ---
>
> Key: KAFKA-16866
> URL: https://issues.apache.org/jira/browse/KAFKA-16866
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Justine Olshan
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Seems like this test introduced in 
> [https://github.com/apache/kafka/pull/15625] is failing consistently.
> org.opentest4j.AssertionFailedError: 
> Expected :61
> Actual   :11



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140866041

   @jolshan I file https://github.com/apache/kafka/pull/16146 to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621424229


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -71,63 +69,54 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
  */
 private final Set subscribedTopicIds;
 
-/**
- * The number of members to receive an extra partition beyond the minimum 
quota.
- * Minimum Quota = Total Partitions / Total Members
- * Example: If there are 11 partitions to be distributed among 3 members,
- *  each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
- */
-private int remainingMembersToGetAnExtraPartition;
-
 /**
  * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
- * Minimum quota = total partitions / total members.
  */
-private Map potentiallyUnfilledMembers;
+private final List potentiallyUnfilledMembers;

Review Comment:
   why do we call this potentiallyUnfilledMembers rather than unfilledMembers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16833) Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode

2024-05-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16833:
--

Assignee: Alyssa Huang

> Cluster missing topicIds from equals and hashCode, PartitionInfo missing 
> equals and hashCode
> 
>
> Key: KAFKA-16833
> URL: https://issues.apache.org/jira/browse/KAFKA-16833
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16833) Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode

2024-05-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16833.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Cluster missing topicIds from equals and hashCode, PartitionInfo missing 
> equals and hashCode
> 
>
> Key: KAFKA-16833
> URL: https://issues.apache.org/jira/browse/KAFKA-16833
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alyssa Huang
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #16062:
URL: https://github.com/apache/kafka/pull/16062


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16062:
URL: https://github.com/apache/kafka/pull/16062#issuecomment-2140856964

   The failed test is traced by 
https://issues.apache.org/jira/browse/KAFKA-16866


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621422471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1621421194


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
 }
 }
 
-// The minimum required quota that each member needs to meet for a 
balanced assignment.
-// This is the same for all members.
-final int numberOfMembers = groupSpec.members().size();
-final int minQuota = totalPartitionsCount / numberOfMembers;
+// Compute the minimum required quota per member and the number of 
members
+// who should receive an extra partition.
+int numberOfMembers = groupSpec.members().size();
+minimumMemberQuota = totalPartitionsCount / numberOfMembers;
 remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-groupSpec.members().keySet().forEach(memberId ->
-targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-));
-
-potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-unassignedPartitionsRoundRobinAssignment();
+// Revoke the partitions which are either not part of the 
subscriptions or above
+// the maximum quota.
+maybeRevokePartitions();
 
-if (!unassignedPartitions.isEmpty()) {
-throw new PartitionAssignorException("Partitions were left 
unassigned");
-}
+// Assign the unassigned partitions to the members with space.
+assignRemainingPartitions();
 
 return new GroupAssignment(targetAssignment);
 }
 
-/**
- * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
- *
- *  For each member:
- * 
- *  Find the valid current assignment considering topic 
subscriptions and metadata
- *  If the current assignment exists, retain partitions up to the 
minimum quota.
- *  If the current assignment size is greater than the minimum 
quota and
- *  there are members that could get an extra partition, assign 
the next partition as well.
- *  Finally, if the member's current assignment size is less than 
the minimum quota,
- *  add them to the potentially unfilled members map and track the 
number of remaining
- *  partitions required to meet the quota.
- * 
- * 
- *
- * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
- *  including members that are eligible to receive an extra 
partition.
- */
-private Map assignStickyPartitions(int minQuota) {
-Map potentiallyUnfilledMembers = new HashMap<>();
-
-groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-List validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-assignmentMemberSpec.assignedPartitions()
-);
-
-int currentAssignmentSize = validCurrentMemberAssignment.size();
-// Number of partitions required to meet the minimum quota.
-int remaining = minQuota - currentAssignmentSize;
-
-if (currentAssignmentSize > 0) {
-int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-});
-
-if (remaining < 0) {
-// The extra partition is located at the last index from 
the previous step.
-if (remainingMembersToGetAnExtraPartition > 0) {
-TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-addPartitionToAssignment(
-targetAssignment,
-memberId,
-topicIdPartition.topicId(),
-topicIdPartition.partitionId()
-);
-remainingMembersToGetAnExtraPartition--;
+private void maybeRevokePartitions() {
+for (Map.Entry entry : 
groupSpec.members().entrySet()) {
+String memberId = entry.getKey();
+AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+Map> oldAssignment = 

Re: [PR] [MINOR] Code Cleanup - Connect Module [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #16066:
URL: https://github.com/apache/kafka/pull/16066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16047) Source connector with EOS enabled have some InitProducerId requests timing out, effectively failing all the tasks & the whole connector

2024-05-30 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-16047:
-

Assignee: Edoardo Comar

> Source connector with EOS enabled have some InitProducerId requests timing 
> out, effectively failing all the tasks & the whole connector
> ---
>
> Key: KAFKA-16047
> URL: https://issues.apache.org/jira/browse/KAFKA-16047
> Project: Kafka
>  Issue Type: Bug
>  Components: connect, mirrormaker
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 
> 3.6.1
>Reporter: Angelos Kaltsikis
>Assignee: Edoardo Comar
>Priority: Major
>
> Source Connectors with 'exactly.once.support = required' may have some of 
> their tasks that issue InitProducerId requests from the admin client timeout. 
> In the case of MirrorSourceConnector, which was the source connector that i 
> found the bug, the bug was effectively making all the tasks (in the specific 
> case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 
> 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many 
> restarts i did to the connector/tasks, i couldn't get the 
> MirrorSourceConnector in a healthy RUNNING state again.
> Due to the low timeout that has been [hard-coded in the 
> code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
>  (1ms), there is a chance that the `InitProducerId` requests timeout in case 
> of "slower-than-expected" Kafka brokers (that do not process & respond to the 
> above request in <= 1ms). (feel free to read more information about the issue 
> in the "More Context" section below)
> [~ChrisEgerton] I would appreciate it if you could respond to the following 
> questions
> - How and why was the 1ms magic number for transaction timeout has to be 
> chosen?
> - Is there any specific reason that it can be guaranteed that the 
> `InitProducerId` request can be processed in such a small time window? 
> - I have tried the above in multiple different Kafka clusters that are hosted 
> in different underlying datacenter hosts and i don't believe that those 
> brokers are "slow" for some reason. If you feel that the brokers are slower 
> than expected, i would appreciate any pointers on how could i find out what 
> is the bottleneck
> h3. Temporary Mitigation
> I have increased the timeout to 1000ms (randomly picked this number, just 
> wanted to give enough time to brokers to always complete those type of 
> requests). It fix can be found in my fork 
> https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
>  
> h3. Final solution
> The temporary mitigation is not ideal, as it still randomly picks a timeout 
> for such an operation which may high enough but it's not ensured that it will 
> always be high enough. Shall we introduce something client configurable ?
> At the same time, i was thinking whether it makes sense to introduce some 
> tests that simulate slower than the "blazing" fast mocked brokers that exist 
> in Unit Tests, so as to be able to catch this type of low timeouts that 
> potentially make some software features not usable.
> h3. What is affected
> The above bug exists in MirrorSourceConnector Tasks running in distributed 
> Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode 
> enabled (pre-requisite for the exactly.once.support to work). I believe this 
> should be true for other SourceConnectors as well (as the code-path that was 
> the one to blame is Connect specific & not MirrorMaker specific).
> h3. More context & logs
> *Connector Logs*
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
> {code}
> *Broker Logs*
> {code:java}
> [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=] Returning 
> COORDINATOR_NOT_AVAILABLE error code to client for 
> kafka-connect-uat-mm2-msc-20th-7's InitProducerId request 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: 
> TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for 
> TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, 
> lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), 
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed 
> due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), 
> aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the 
> 

Re: [PR] KAFKA-15853: Move ZKConfigs related static method out of core and into ZKConfigs [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16109:
URL: https://github.com/apache/kafka/pull/16109#issuecomment-2140831052

   @OmniaGM nice idea but we need to fix conflicts first :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-30 Thread via GitHub


dongnuo123 commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1621405276


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -325,24 +325,12 @@ private Group validateOffsetCommit(
 }

Review Comment:
   I kind of forget why we wanted to check `GroupIdNotFoundException`. I feel 
the current implementation does support the classic protocol member



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-30 Thread via GitHub


dongnuo123 commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1621403173


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -857,6 +857,7 @@ public void validateOffsetCommit(
 throw Errors.UNKNOWN_MEMBER_ID.exception();
 }
 
+// TODO: A temp marker. Will remove it when the pr is open.
 if (!isTransactional && isInState(COMPLETING_REBALANCE)) {

Review Comment:
   Not sure why we only check `COMPLETING_REBALANCE` but not 
`PREPARING_REBALANCE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #16112:
URL: https://github.com/apache/kafka/pull/16112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-30 Thread via GitHub


dongnuo123 opened a new pull request, #16145:
URL: https://github.com/apache/kafka/pull/16145

   During online migration, there could be ConsumerGroup that has members that 
uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` 
could be thrown in ConsumerGroup offset fetch/commit validation but it's not 
supported by the classic protocol. Thus this patch changed 
`ConsumerGroup#validateOffsetCommit` to ensure  compatibility.
   
   There's no need to change `ConsumerGroup#validateOffsetFetch` because the 
member id and member epoch are always empty and -1 in the classic protocol, so 
the offset fetch request is always valid.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]

2024-05-30 Thread via GitHub


gharris1727 commented on code in PR #15513:
URL: https://github.com/apache/kafka/pull/15513#discussion_r1621400369


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -737,12 +739,12 @@ public void runOuterJoin(final StreamJoined streamJoine
 inputTopic1.pipeInput(expectedKey, "C" + expectedKey);
 }
 processor.checkAndClearProcessResult(
-new KeyValueTimestamp<>(0, "C0+a0", 0L),
-new KeyValueTimestamp<>(0, "C0+b0", 0L),
-new KeyValueTimestamp<>(1, "C1+a1", 0L),
-new KeyValueTimestamp<>(1, "C1+b1", 0L),
-new KeyValueTimestamp<>(2, "C2+b2", 0L),
-new KeyValueTimestamp<>(3, "C3+b3", 0L)
+new KeyValueTimestamp<>(0, "C0+0", 0L),
+new KeyValueTimestamp<>(0, "C0+0", 0L),
+new KeyValueTimestamp<>(1, "C1+1", 0L),
+new KeyValueTimestamp<>(1, "C1+1", 0L),

Review Comment:
   You're right, I didn't notice this. I did a search-and-replace renaming, and 
reverted the stuff which didn't make sense.
   
   I did have to manually renumber stuff like "a0-0", and some places where 
capital letters "A0" were used on the inputStream2 to fit the pattern better. 
PTAL, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621395136


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -327,8 +339,11 @@ void testEnsureEventsAreCompleted() {
 assertTrue(applicationEventsQueue.isEmpty());
 }
 
+// Look into this one

Review Comment:
   My mistake leaving that there, that was a comment for myself that I forgot 
to remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16866) RemoteLogManagerTest.testCopyQuotaManagerConfig failing

2024-05-30 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16866:
--

 Summary: RemoteLogManagerTest.testCopyQuotaManagerConfig failing
 Key: KAFKA-16866
 URL: https://issues.apache.org/jira/browse/KAFKA-16866
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.8.0
Reporter: Justine Olshan


Seems like this test introduced in [https://github.com/apache/kafka/pull/15625] 
is failing consistently.

org.opentest4j.AssertionFailedError: 
Expected :61
Actual   :11



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621386306


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +157,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
+}
+
 @Test
 public void testApplicationEvent() {
 ApplicationEvent e = new PollEvent(100);
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+verify(applicationEventProcessor).process(e);

Review Comment:
   I checked Mockito documentation and adding times(1) is redundant, so not 
really a big deal either way to keep it or remove it. Do let me know though if 
there is a stylistic preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


jolshan commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140804438

   Can we look at testCopyQuotaManagerConfig() – 
kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty 
consistently. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-05-30 Thread via GitHub


brenden20 commented on PR #16124:
URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140800759

   @kirktrue thank you for the suggestions, I have implemented and pushed your 
suggestions. Let me know if everything looks good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Adds a test case to test that an exception is thrown in invalid ports [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16112:
URL: https://github.com/apache/kafka/pull/16112#issuecomment-2140786920

   @ahmedryasser Thanks for your contribution. Could you please add "MINOR: " 
to your title?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-30 Thread via GitHub


cmccabe commented on code in PR #15945:
URL: https://github.com/apache/kafka/pull/15945#discussion_r1621372420


##
metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java:
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it 
should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. 
The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In 
other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or 
equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no 
directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just 
compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+private final Logger log;
+private final int id;
+private final Runnable refreshRegistrationCallback;
+
+/**
+ * Create the tracker.
+ *
+ * @param idThe ID of this broker.
+ * @param targetDirectories The directories managed by this 
broker.
+ * @param refreshRegistrationCallback   Callback to run if we need to 
refresh the registration.
+ */
+public BrokerRegistrationTracker(
+int id,
+List targetDirectories,
+Runnable refreshRegistrationCallback
+) {
+this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] 
").
+logger(BrokerRegistrationTracker.class);
+this.id = id;
+this.refreshRegistrationCallback = refreshRegistrationCallback;
+}
+
+@Override
+public String name() {
+return "BrokerRegistrationTracker(id=" + id + ")";
+}
+
+@Override
+public void onMetadataUpdate(
+MetadataDelta delta,
+MetadataImage newImage,
+LoaderManifest manifest
+) {
+boolean checkBrokerRegistration = false;
+if (delta.featuresDelta() != null) {
+if (delta.metadataVersionChanged().isPresent()) {
+if (log.isTraceEnabled()) {
+log.trace("Metadata version change is present: {}",
+delta.metadataVersionChanged());
+}
+checkBrokerRegistration = true;
+}
+}
+if (delta.clusterDelta() != null) {
+if (delta.clusterDelta().changedBrokers().get(id) != null) {
+if (log.isTraceEnabled()) {
+log.trace("Broker change is present: {}",
+delta.clusterDelta().changedBrokers().get(id));
+}
+checkBrokerRegistration = true;
+}
+}
+if (checkBrokerRegistration) {
+if 
(brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+delta.clusterDelta().broker(id))) {
+refreshRegistrationCallback.run();
+}
+}
+}
+
+/**
+ * Check if the current broker registration needs to be refreshed.
+ *
+ * @param registration  The current broker registration, or null if there 
is none.
+ * @return  True only if we should refresh.
+ */
+boolean brokerRegistrationNeedsRefresh(
+MetadataVersion metadataVersion,
+BrokerRegistration registration
+) {
+// If there is no existing registration, the BrokerLifecycleManager 
must still be sending it.
+// So we don't 

Re: [PR] KAFKA-15853: Move configDef out of core [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16116:
URL: https://github.com/apache/kafka/pull/16116#issuecomment-2140785638

   @OmniaGM Sorry that please fix the conflicts again :_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621368550


##
build.gradle:
##
@@ -787,6 +800,12 @@ subprojects {
 skipProjects = [ ":jmh-benchmarks", ":trogdor" ]
 skipConfigurations = [ "zinc" ]
   }
+
+  afterEvaluate {

Review Comment:
   Maybe we can set spotless directly. For example:
   ```
 if (project.name in spotlessApplyModules) {
   apply plugin: 'com.diffplug.spotless'
   spotless {
 java {
   importOrder('kafka', 'org.apache.kafka', 'com', 'net', 'org', 
'java', 'javax', '', '\\#')
   removeUnusedImports()
 }
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-05-30 Thread via GitHub


gharris1727 commented on code in PR #16095:
URL: https://github.com/apache/kafka/pull/16095#discussion_r1621349710


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -231,6 +231,12 @@ private synchronized void onFailure(Throwable t) {
 if (this.state == State.FAILED)
 return;
 
+// Call stop() on the connector to release its resources. Connector
+// could fail in the start() method, which is why we call stop() on
+// INIT state as well.
+if (this.state == State.STARTED || this.state == State.INIT)
+connector.stop();

Review Comment:
   This is a potentially blocking call to the connector, and I don't think 
that's a good fit for this onFailure handler. This call would delay the 
statusListener call, which delays notifying the REST API of the FAILED status 
and updating the metrics. If it blocks indefinitely, the status and metrics are 
never updated.
   
   There is a connector.stop() call in doShutdown that could be changed to 
execute for the INIT and FAILED states. That would leave the resources 
allocated while the connector is waiting in the FAILED state, but would at 
least ensure they don't leak long-term.
   
   We may also change the control flow to make the transition to the FAILED 
state trigger doShutdown early, rather than having it wait() with all the 
resources still allocated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1621358471


##
checkstyle/suppressions.xml:
##
@@ -361,4 +361,7 @@
 
 
+
+

Review Comment:
   why we need this change?



##
build.gradle:
##
@@ -1007,7 +1026,7 @@ project(':core') {
 testImplementation libs.junitJupiter
 testImplementation libs.slf4jlog4j
 testImplementation libs.caffeine
-

Review Comment:
   please revert this change



##
build.gradle:
##
@@ -47,7 +47,7 @@ plugins {
   // Updating the shadow plugin version to 8.1.1 causes issue with signing and 
publishing the shadowed
   // artifacts - see https://github.com/johnrengelman/shadow/issues/901
   id 'com.github.johnrengelman.shadow' version '8.1.0' apply false
-  id 'com.diffplug.spotless' version '6.14.0' apply false // 6.14.1 and newer 
require Java 11 at compile time, so we can't upgrade until AK 4.0
+  id 'com.diffplug.spotless' version "${spotlessVersion}" apply false

Review Comment:
   Do we need this variable? Also, why not using latest version `6.25.0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16518) Storage tool changes for KIP-853

2024-05-30 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850851#comment-17850851
 ] 

Muralidhar Basani commented on KAFKA-16518:
---

[~jsancio] have one short question in the pr about constructor of VoterSet and 
VoterNode

> Storage tool changes for KIP-853
> 
>
> Key: KAFKA-16518
> URL: https://issues.apache.org/jira/browse/KAFKA-16518
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Reporter: José Armando García Sancio
>Assignee: Muralidhar Basani
>Priority: Major
> Fix For: 3.8.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-kafka-storage



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Refactor DynamicConfig [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #16133:
URL: https://github.com/apache/kafka/pull/16133#discussion_r1621350440


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -675,7 +677,7 @@ object DynamicLogConfig {
   // Exclude message.format.version for now since we need to check that the 
version
   // is supported on all brokers in the cluster.
   @nowarn("cat=deprecation")
-  val ExcludedConfigs = Set(ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG)

Review Comment:
   Do we need this variable? Maybe we can remove it by following change.
   ```scala
 // Exclude message.format.version for now since we need to check that the 
version
 // is supported on all brokers in the cluster.
 @nowarn("cat=deprecation")
 val ReconfigurableConfigs = 
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet - 
ServerLogConfigs.LOG_MESSAGE_FORMAT_VERSION_CONFIG
   ```



##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -319,7 +321,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   }
 
   private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = 
CoreUtils.inWriteLock(lock) {
-val nonDynamic = 
configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
+val nonDynamic = configNames.filter(nonDynamicProps.contains)

Review Comment:
   How about `configNames.intersect(nonDynamicProps)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


dajac commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2140752725

   The build does not seem to start… I am not sure why.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16115:
URL: https://github.com/apache/kafka/pull/16115#issuecomment-2140747685

   @brenden20, as mentioned on another one of your PRs, there's a checkstyle 
violation here. You can run this command locally to avoid waiting for the CI 
infrastructure to catch it:
   
   ```
   ./gradlew check -x test
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-30 Thread via GitHub


dajac commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1621344907


##
server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum GroupVersion implements FeatureVersion {
+
+// Version 1 enables the classic rebalance protocol. This is the default
+// behavior even if the feature flag is not set.
+GV_1(1, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap()),

Review Comment:
   Updated to use the version 0 approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16124:
URL: https://github.com/apache/kafka/pull/16124#issuecomment-2140745523

   The test failures are unrelated, FWIW.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621342452


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;

Review Comment:
   ```suggestion
   import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
   import static org.junit.jupiter.api.Assertions.assertEquals;
   import static org.junit.jupiter.api.Assertions.assertFalse;
   import static org.junit.jupiter.api.Assertions.assertTrue;
   import static org.mockito.ArgumentMatchers.any;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;

Review Comment:
   Just bring these explicit imports back to make checkstyle stop complaining
   
   ```suggestion
   import static org.mockito.ArgumentMatchers.eq;
   import static org.mockito.Mockito.doAnswer;
   import static org.mockito.Mockito.doThrow;
   import static org.mockito.Mockito.mock;
   import static org.mockito.Mockito.times;
   import static org.mockito.Mockito.verify;
   import static org.mockito.Mockito.when;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable transaction verification with new group coordinator in TransactionsTest [kafka]

2024-05-30 Thread via GitHub


dajac merged PR #16139:
URL: https://github.com/apache/kafka/pull/16139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16140:
URL: https://github.com/apache/kafka/pull/16140#issuecomment-2140741298

   Looks like there are some checkstyle failures due to the use of wildcard 
imports.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1621334851


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1290,4 +1290,4 @@ static class MemberInfo {
 this.memberEpoch = Optional.empty();
 }
 }
-}
+}

Review Comment:
   We should revert/fix this change as it's whitespace only.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,89 +39,111 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
+import static org.junit.jupiter.api.Assertions.*;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new 

Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on code in PR #15989:
URL: https://github.com/apache/kafka/pull/15989#discussion_r1621334504


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -1184,6 +1185,141 @@ public void 
testRestoreRestartRequestInconsistentState() {
 verify(configLog).stop();
 }
 
+@Test
+public void testPutTaskConfigsZeroTasks() throws Exception {
+when(configLog.partitionCount()).thenReturn(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+// Bootstrap as if we had already added the connector, but no tasks 
had been added yet
+whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
+
+// Null before writing
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+
+// Task configs should read to end, write to the log, read to end, 
write root.
+
doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd();
+
+expectConvertWriteRead(
+COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+"tasks", 0); // We have 0 tasks
+
+configStorage.putTaskConfigs("connector1", Collections.emptyList());
+
+// As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
+configUpdateListener.onTaskConfigUpdate(Collections.emptyList());

Review Comment:
   We need to use `verify` to make sure this method is called as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1621332051


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -478,6 +478,23 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+@Override
+public String toStringBase() {
+return super.toStringBase() +
+", heartbeatTimer=" + heartbeatTimer +
+", heartbeatIntervalMs=" + heartbeatIntervalMs;
+}
+
+// Visible for testing
+protected Timer heartbeatTimer() {
+return this.heartbeatTimer;
+}
+
+// Visible for testing
+protected long heartbeatIntervalMs() {
+return this.heartbeatIntervalMs;

Review Comment:
   ```suggestion
   return heartbeatIntervalMs;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState1 = new 
HeartbeatRequestState(

Review Comment:
   Super nit: we can drop the `1` at the end of the variable name, right?
   
   ```suggestion
   HeartbeatRequestState heartbeatRequestState = new 
HeartbeatRequestState(
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -478,6 +478,23 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+@Override
+public String toStringBase() {
+return super.toStringBase() +
+", heartbeatTimer=" + heartbeatTimer +
+", heartbeatIntervalMs=" + heartbeatIntervalMs;
+}
+
+// Visible for testing
+protected Timer heartbeatTimer() {
+return this.heartbeatTimer;

Review Comment:
   ```suggestion
   return heartbeatTimer;
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState1 = new 
HeartbeatRequestState(
+logContext,
+time,
+10,
+retryBackoffMs,
+retryBackoffMaxMs,
+.2
+);
+
+RequestState requestState = new RequestState(
+logContext,
+
"org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager$HeartbeatRequestState",

Review Comment:
   Perhaps we could make `HeartbeatRequestState` package-protected, then we 
could do this:
   
   ```suggestion
   
HeartbeatRequestManager.HeartbeatRequestState.class.getName(),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16802) Move build.gradle java version information inside of a java block

2024-05-30 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-16802.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Move build.gradle java version information inside of a java block
> -
>
> Key: KAFKA-16802
> URL: https://issues.apache.org/jira/browse/KAFKA-16802
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Assignee: Muralidhar Basani
>Priority: Major
>  Labels: newbie
> Fix For: 3.8.0
>
>
> The org.gradle.api.plugins.JavaPluginConvention type has been deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#java_convention_deprecation]
>   
> [https://github.com/apache/kafka/blob/95adb7bfbfc69b3e9f3538cc5d6f7c6a577d30ee/build.gradle#L292-L295]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16802 : Moving java versions inside java block [kafka]

2024-05-30 Thread via GitHub


gharris1727 merged PR #16135:
URL: https://github.com/apache/kafka/pull/16135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #16042:
URL: https://github.com/apache/kafka/pull/16042#issuecomment-2140664027

   > The test names are testDescribeLogDirsWithoutAnyPartitionTopic and 
testDescribeLogDirs.
   
   It seems to me the method needs to come with the guarantee: 
`DescribeLogDirsTopic` should not have empty `partitions`
   
   Hence, could you please add following asserts to `testDescribeLogDirs`
   
   ```scala
 responses.foreach { response =>
   assertEquals(Errors.NONE.code, response.errorCode)
   assertTrue(response.totalBytes > 0)
   assertTrue(response.usableBytes >= 0)
   assertFalse(response.topics().isEmpty)
   response.topics().forEach(t => assertFalse(t.partitions().isEmpty))
 }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15305) The background thread should try to process the remaining task until the shutdown timer is expired

2024-05-30 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850834#comment-17850834
 ] 

Chia-Ping Tsai commented on KAFKA-15305:


{quote}
So seems we also need to make sure that the HB manager is actually polled when 
closing the consumer (the HBManager.pollOnClose you had suggested at some point)
{quote}

yep, I prefer to return HB of leaving group by `pollOnClose` and that is the 
description I written in KAFKA-16639. Also, I agree the solution of KAFKA-16639 
PR as in the current flow the HB of leaving group can be generated.

thread of closing consumer

1. prepareShutdown 
(https://github.com/apache/kafka/blob/32b2b73f673ecd41d17c03e99db3746c517990c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1268)
2. waiting for `LeaveOnCloseEvent` 
(https://github.com/apache/kafka/blob/32b2b73f673ecd41d17c03e99db3746c517990c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1279)
3. call ConsumerNetworkThread#close to stop the loop of 
ConsumerNetworkThread#run() 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L281)

network thread (ConsumerNetworkThread)

1.  processApplicationEvents 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L138).
 This method must be executed to handle `LeaveOnCloseEvent`
2. After `processApplicationEvents` (LeaveOnCloseEvent), 
`membershipManager.leaveGroup` is executed so ConsumerNetworkThread is aware of 
"leaving"
3. After `processApplicationEvents`, ConsumerNetworkThread will call HB manager 
poll sequentially 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L144)

In short, ConsumerNetworkThread always call the HB manager poll after handling 
LeaveOnCloseEvent.

{quote}
With that, at this point we would actually generate the HB to leave, add then 
we would hit the logic you mentioned above with a request to send (not empty), 
here. Makes sense?
{quote}

Both ways (poll/pollOnClose) can resolve KAFKA-16639. I feel your comment 
(https://github.com/apache/kafka/pull/16017#discussion_r1612039771) is a simple 
solution, and so I did not add comment to say "we must to create HB request in 
pollOnClose)



> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: make public the consumer group migration policy config [kafka]

2024-05-30 Thread via GitHub


dajac merged PR #16128:
URL: https://github.com/apache/kafka/pull/16128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558404

I'm going to close and reopen to force another build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue closed pull request #16031: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/16031


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-05-30 Thread via GitHub


kirktrue commented on PR #16031:
URL: https://github.com/apache/kafka/pull/16031#issuecomment-2140558063

   @lianetm—relevant test failures have been addressed. There are three 
unrelated test failures from flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621207749


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberSubscriptionSpecImpl.java:
##
@@ -18,105 +18,63 @@
 
 import org.apache.kafka.common.Uuid;
 
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
 /**
- * The assignment specification for a consumer group member.
+ * Implementation of the {@link MemberSubscriptionSpec} interface.
  */
-public class AssignmentMemberSpec {
-/**
- * The instance ID if provided.
- */
-private final Optional instanceId;
-
-/**
- * The rack ID if provided.
- */
+public class MemberSubscriptionSpecImpl implements MemberSubscriptionSpec {
 private final Optional rackId;
-
-/**
- * Topics Ids that the member is subscribed to.
- */
 private final Set subscribedTopicIds;
 
 /**
- * Partitions assigned keyed by topicId.
- */
-private final Map> assignedPartitions;
-
-/**
- * @return The instance ID as an Optional.
+ * Constructs a new {@code MemberSubscriptionSpecImpl}.
+ *
+ * @param rackIdThe rack Id.
+ * @param subscribedTopicIdsThe set of subscribed topic Ids.
  */
-public Optional instanceId() {
-return instanceId;
+public MemberSubscriptionSpecImpl(
+Optional rackId,
+Set subscribedTopicIds
+) {
+Objects.requireNonNull(rackId);
+Objects.requireNonNull(subscribedTopicIds);
+this.rackId = rackId;
+this.subscribedTopicIds = subscribedTopicIds;

Review Comment:
   I was just following the format I saw in TargetAssignmentResult and a few 
places, wasn't sure what to use



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621196989


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   I actually wanted to asak about whether we want to return null or return an 
empty object. I returned a new memberSubscriptionSpecImpl object in the impl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   thanks for the comment! I wanted to say that during our design discussions, 
we had agreed to keep it as memberSubscriptionSpec to maintain consistency and 
streamline the review process so we don't go back and forth. If there are new 
considerations or changes that we might not have anticipated, I would love to 
understand them better so we can make this more efficient going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16821: Member Subscription Spec Interface [kafka]

2024-05-30 Thread via GitHub


rreddy-22 commented on code in PR #16068:
URL: https://github.com/apache/kafka/pull/16068#discussion_r1621186636


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java:
##
@@ -39,4 +41,20 @@ public interface GroupSpec {
  * False, otherwise.
  */
 boolean isPartitionAssigned(Uuid topicId, int partitionId);
+
+/**
+ * Gets the member subscription specification for a member.
+ *
+ * @param memberId  The member Id.
+ * @return The member's subscription metadata.
+ */
+MemberSubscriptionSpec memberSubscriptionSpec(String memberId);

Review Comment:
   thanks for the comment! I wanted to say that during our design discussions, 
we had agreed to keep it as memberSubscriptionSpec to maintain consistency and 
streamline the review process so we don't go back and forth. If there are new 
considerations or changes that we might not have anticipated, I would love to 
understand them better so we can make this more efficient going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [2/N]: Allow unstable feature versions and rename unstable metadata config [kafka]

2024-05-30 Thread via GitHub


jolshan commented on code in PR #16130:
URL: https://github.com/apache/kafka/pull/16130#discussion_r1621089819


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -354,6 +354,7 @@ Found problem:
 MetadataVersion.LATEST_PRODUCTION,
 Map(TestFeatureVersion.FEATURE_NAME -> featureLevel),
 allFeatures,
+false,

Review Comment:
   `if (featureLevel <= 
Features.TEST_VERSION.defaultValue(MetadataVersion.LATEST_PRODUCTION))` this 
means we skip version 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16827: Integrate kafka native-image with system tests [kafka]

2024-05-30 Thread via GitHub


omkreddy merged PR #16046:
URL: https://github.com/apache/kafka/pull/16046


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-05-30 Thread via GitHub


yashmayya commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-2140247839

   @mdedetrich apologies for the late response, I didn't get notified for your 
comment oddly enough. Please feel free to take over, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >