[GitHub] [kafka] cadonna commented on pull request #12809: KAFKA-14324: Upgrade RocksDB to 7.1.2
cadonna commented on PR #12809: URL: https://github.com/apache/kafka/pull/12809#issuecomment-1539552693 @vepo The memory leaks are fixed in the following PRs: https://github.com/apache/kafka/pull/12935 https://github.com/apache/kafka/pull/12959 Without those fixes we would have reverted this PR. There should not be any memory leaks caused by Kafka Streams anymore. If there are any, please open a ticket and/or provide a fix. A popular root cause of memory leaks is not closing iterators or other RocksDB objects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics
urbandan commented on PR #13690: URL: https://github.com/apache/kafka/pull/13690#issuecomment-1539486889 @viktorsomogyi can you please take a look at this small fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan opened a new pull request, #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics
urbandan opened a new pull request, #13690: URL: https://github.com/apache/kafka/pull/13690 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics
Daniel Urban created KAFKA-14978: Summary: ExactlyOnceWorkerSourceTask does not remove parent metrics Key: KAFKA-14978 URL: https://issues.apache.org/jira/browse/KAFKA-14978 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Daniel Urban Assignee: Daniel Urban ExactlyOnceWorkerSourceTask removeMetrics does not invoke super.removeMetrics, meaning that only the transactional metrics are removed, and common source task metrics are not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test
philipnee commented on PR #13664: URL: https://github.com/apache/kafka/pull/13664#issuecomment-1539356020 Thank you @vvcephei - https://issues.apache.org/jira/browse/KAFKA-14977 is filed 💘 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14977) testDescribeStateOfExistingGroupWithRoundRobinAssignor is flaky
Philip Nee created KAFKA-14977: -- Summary: testDescribeStateOfExistingGroupWithRoundRobinAssignor is flaky Key: KAFKA-14977 URL: https://issues.apache.org/jira/browse/KAFKA-14977 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Philip Nee Attachments: failed_test.log Relevent ticket: - KAFKA-8110 Flaky Test DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions - KAFKA-7969 Flaky Test DescribeConsumerGroupTest#testDescribeOffsetsOfExistingGroupWithNoMembers - KAFKA-8068 Flaky Test DescribeConsumerGroupTest#testDescribeMembersOfExistingGroup - KAFKA-8706 Kafka 2.3.0 Transient Unit Test Failures on Oracle Linux - See attached for details See the attachment for failure -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-8170) To add kafka data at rest encryption
[ https://issues.apache.org/jira/browse/KAFKA-8170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720759#comment-17720759 ] kaiyangzhang edited comment on KAFKA-8170 at 5/9/23 3:28 AM: - [~sliebau] Hello, do you have any new progress in[ KIP-317], or do you have any information for reference? We have data encryption requirements here. Do you have any recommended solutions? was (Author: kaiyangzhang): [~sliebau] Hello, do you have any new progress in[ KIP-317], or do you have any information for reference? > To add kafka data at rest encryption > > > Key: KAFKA-8170 > URL: https://issues.apache.org/jira/browse/KAFKA-8170 > Project: Kafka > Issue Type: New Feature > Components: log >Reporter: Akash >Priority: Minor > Labels: features, security > > Kafka have mechanism for wire encryption of data. > But the kafka data at rest which exist in /- > is still unencrypted. > This directories now have log files with actual messages embedded metadata, > but unauthorised user can still recover messages from this files > Addiding encryption for this data would be valuable for preventing message > protection from disk theft, unauthorised user access on servers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-8170) To add kafka data at rest encryption
[ https://issues.apache.org/jira/browse/KAFKA-8170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720759#comment-17720759 ] kaiyangzhang commented on KAFKA-8170: - [~sliebau] Hello, do you have any new progress in[ KIP-317], or do you have any information for reference? > To add kafka data at rest encryption > > > Key: KAFKA-8170 > URL: https://issues.apache.org/jira/browse/KAFKA-8170 > Project: Kafka > Issue Type: New Feature > Components: log >Reporter: Akash >Priority: Minor > Labels: features, security > > Kafka have mechanism for wire encryption of data. > But the kafka data at rest which exist in /- > is still unencrypted. > This directories now have log files with actual messages embedded metadata, > but unauthorised user can still recover messages from this files > Addiding encryption for this data would be valuable for preventing message > protection from disk theft, unauthorised user access on servers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13684: MINOR: fix compilation failure
showuon merged PR #13684: URL: https://github.com/apache/kafka/pull/13684 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
showuon commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1188050289 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,29 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * If the maximum offset beyond index, appended to the log section, it throws LogSegmentOffsetOverflowException Review Comment: This comment is not clear enough (ex: what is maximum offset mean in this test? what index mean here?). Maybe: ``` LogSegmentOffsetOverflowException should be thrown while appending the logs if: 1. largestOffset - baseOffset < 0 2. largestOffset - baseOffset > Integer.MAX_VALUE ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13584: MINOR: Add log segment unit tests, If the maximum offset beyond index, appen…
showuon commented on code in PR #13584: URL: https://github.com/apache/kafka/pull/13584#discussion_r1188050289 ## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ## @@ -65,6 +68,29 @@ class LogSegmentTest { Utils.delete(logDir) } + /** + * If the maximum offset beyond index, appended to the log section, it throws LogSegmentOffsetOverflowException Review Comment: This comment is not clear enough (ex: what is maximum offset mean in this test?). Maybe: ``` LogSegmentOffsetOverflowException should be thrown while appending the logs if: 1. largestOffset - baseOffset < 0 2. largestOffset - baseOffset > Integer.MAX_VALUE ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog
yashmayya commented on PR #13688: URL: https://github.com/apache/kafka/pull/13688#issuecomment-1539275733 @rhauch looks like there are 4 test failures - none of them are new failures or related to this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Sure, that check was missed while pulling the changes. Good catch. Updated it with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Sure, that check was missed. Good catch. Updated it with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup
jeffkbkim commented on code in PR #13639: URL: https://github.com/apache/kafka/pull/13639#discussion_r1187976451 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a respo
[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
mjsax commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1180626928 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Seems we should not mess with the control flow here. Me might end up with spagetti-code. I would advocate to the move the commit logic into the restore code path if possible as mentioned further above -- this way, the existing control flow is not changed and we avoid all these compilations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
mjsax commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1188018045 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); Review Comment: Ok. Not sure how important it is for users to see by default, but I agree it should not be too frequent as it should only happen during a rebalance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188009762 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { +Map topics = new HashMap<>(); +topics.put(topic1Uuid, new AssignmentTopicMetadata(3)); +topics.put(topic3Uuid, new AssignmentTopicMetadata(2)); + +Map members = new HashMap<>(); +// Initial Subscriptions are: A -> T1, T3 | B -> T1, T3 + +members.put(consumerA, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap()) +); + +members.put(consumerB, new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Arrays.asList(topic1Uuid, topic3Uuid), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment computedAssignment = assignor.assign(assignmentSpec); + +Map>> expectedAssignment = new HashMap<>(); +// Topic 1 Partitions Assignment +mkAssignment(expectedAssignment, topic1Uuid, Arrays.asList(0, 1)); +mkAssignment(expectedAssignment, topic1Uuid, Collections.singleton(2)); +// Topic 3 Partitions Assignment +mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(0)); +mkAssignment(expectedAssignment, topic3Uuid, Collections.singleton(1)); + +assertAssignment(expectedAssignment, computedAssignment); +} + +@Test +public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() { +Map topics = new HashMap<>();
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Range Assignor on the Server (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1188009097 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RangeAssignorTest { +private final RangeAssignor assignor = new RangeAssignor(); +private final Uuid topic1Uuid = Uuid.randomUuid(); +private final Uuid topic2Uuid = Uuid.randomUuid(); +private final Uuid topic3Uuid = Uuid.randomUuid(); +private final String consumerA = "A"; +private final String consumerB = "B"; +private final String consumerC = "C"; + +@Test +public void testOneConsumerNoTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.emptyList(), +Collections.emptyMap()) +); + +AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics); +GroupAssignment groupAssignment = assignor.assign(assignmentSpec); + +assertTrue(groupAssignment.members().isEmpty()); +} + +@Test +public void testOneConsumerNonExistentTopic() { +Map topics = Collections.singletonMap(topic1Uuid, new AssignmentTopicMetadata(3)); +Map members = Collections.singletonMap( +consumerA, +new AssignmentMemberSpec( +Optional.empty(), +Optional.empty(), +Collections.singletonList(topic2Uuid), +Collections.emptyMap()) +); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
mehbey commented on PR #13681: URL: https://github.com/apache/kafka/pull/13681#issuecomment-1539231000 Verified that failing Testing are not related to this change ``` [2023-05-08T22:22:09.340Z] 1: Task failed with an exception. [2023-05-08T22:22:09.340Z] --- [2023-05-08T22:22:09.340Z] * What went wrong: [2023-05-08T22:22:09.340Z] Execution failed for task ':streams:upgrade-system-tests-0102:integrationTest'. [2023-05-08T22:22:09.340Z] > Process 'Gradle Test Executor 151' finished with non-zero exit value 1 [2023-05-08T22:22:09.340Z] This problem might be caused by incorrect test process configuration. [2023-05-08T22:22:09.340Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/8.1.1/userguide/java_testing.html#sec:test_execution [2023-05-08T22:22:09.340Z] [2023-05-08T22:22:09.340Z] * Try: [2023-05-08T22:22:09.340Z] > Run with --stacktrace option to get the stack trace. [2023-05-08T22:22:09.340Z] > Run with --info or --debug option to get more log output. [2023-05-08T22:22:09.340Z] > Run with --scan to get full insights. [2023-05-08T22:22:09.340Z] == [2023-05-08T22:22:09.340Z] [2023-05-08T22:22:09.340Z] 2: Task failed with an exception. [2023-05-08T22:22:09.340Z] --- [2023-05-08T22:22:09.340Z] * What went wrong: [2023-05-08T22:22:09.340Z] Execution failed for task ':connect:mirror:integrationTest'. [2023-05-08T22:22:09.340Z] > Process 'Gradle Test Executor 130' finished with non-zero exit value 137 [2023-05-08T22:22:09.340Z] This problem might be caused by incorrect test process configuration. [2023-05-08T22:22:09.340Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/8.1.1/userguide/java_testing.html#sec:test_execution [2023-05-08T22:22:09.340Z] [2023-05-08T22:22:09.340Z] * Try: [2023-05-08T22:22:09.340Z] > Run with --stacktrace option to get the stack trace. [2023-05-08T22:22:09.340Z] > Run with --info or --debug option to get more log output. [2023-05-08T22:22:09.340Z] > Run with --scan to get full insights. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction
[ https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720717#comment-17720717 ] Jun Rao edited comment on KAFKA-14561 at 5/9/23 12:24 AM: -- The PR was reverted in the 3.5 branch. Updated the fix version. was (Author: junrao): The PR was reverted in 3.5 branch. Updated the fix version. > Improve transactions experience for older clients by ensuring ongoing > transaction > - > > Key: KAFKA-14561 > URL: https://issues.apache.org/jira/browse/KAFKA-14561 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.6.0 > > > This is part 3 of KIP-890: > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > See KIP-890 for more details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction
[ https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-14561: Fix Version/s: 3.6.0 (was: 3.5.0) The PR was reverted in 3.5 branch. Updated the fix version. > Improve transactions experience for older clients by ensuring ongoing > transaction > - > > Key: KAFKA-14561 > URL: https://issues.apache.org/jira/browse/KAFKA-14561 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.6.0 > > > This is part 3 of KIP-890: > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > See KIP-890 for more details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
junrao commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187994355 ## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.cluster.Partition +import org.apache.kafka.common.errors.NotLeaderOrFollowerException +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.{TopicIdPartition, Uuid} +import org.apache.kafka.storage.internals.log._ +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import org.mockito.Mockito.{mock, when} + +import java.util.Optional +import java.util.concurrent.CompletableFuture + +import scala.collection._ + +class DelayedRemoteFetchTest { + private val maxBytes = 1024 + private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") + private val fetchOffset = 500L + private val logStartOffset = 0L + private val currentLeaderEpoch = Optional.of[Integer](10) + private val replicaId = 1 + + private val fetchStatus = FetchPartitionStatus( +startOffsetMetadata = new LogOffsetMetadata(fetchOffset), +fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + + @Test + def testFetchWithFencedEpoch(): Unit = { Review Comment: Hmm, where is logic to simulate a fenced epoch in this test? ## core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteLogReaderTest { +RemoteLogManager mockRLM = mock(RemoteLogManager.class); +LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100); +Records records = mock(Records.class); + + Review Comment: extra new line. ## core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala: ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the N
[GitHub] [kafka] satishd commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on PR #13535: URL: https://github.com/apache/kafka/pull/13535#issuecomment-1539215636 Thanks @junrao , updated the [comment](https://github.com/apache/kafka/pull/13535#discussion_r1184274133) with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187999169 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Sure, updated it with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test
vvcephei commented on PR #13664: URL: https://github.com/apache/kafka/pull/13664#issuecomment-1539151873 Thanks, @philipnee ! I agree it's probably flaky, but I don't see a ticket filed for it. We haven't been very strict recently about this, but I'm reluctant to just forge ahead at this point, given all the mailing list discussions about recent merges breaking the build because they didn't block on failing tests. Do you mind filing a ticket for this test, and then we can merge it? (see https://issues.apache.org/jira/browse/KAFKA-8706?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20text%20~%20%22DescribeConsumerGroupTest%22 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13654: HOTFIX: fix broken Streams upgrade system test
mjsax commented on PR #13654: URL: https://github.com/apache/kafka/pull/13654#issuecomment-1539079411 Merged to `trunk` and cherry-picked to `3.5` branch. \cc @mimaison Seems some other streams system tests are still flaky (well, I hope it's not a real issue). I hope to find time to dig into it this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13654: HOTFIX: fix broken Streams upgrade system test
mjsax merged PR #13654: URL: https://github.com/apache/kafka/pull/13654 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
junrao commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187906662 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1187901148 ## shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java: ## @@ -93,32 +96,33 @@ public String toString() { } @Override -public void accept(MetadataNodeManager.Data data) { +public void accept(MetadataShellState state) { String fullGlob = glob.startsWith("/") ? glob : -data.workingDirectory() + "/" + glob; +state.workingDirectory() + "/" + glob; List globComponents = CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob)); -if (!accept(globComponents, 0, data.root(), new String[0])) { +if (!accept(globComponents, 0, state.root(), new String[0])) { handler.accept(Optional.empty()); } } -private boolean accept(List globComponents, - int componentIndex, - MetadataNode node, - String[] path) { +private boolean accept( +List globComponents, +int componentIndex, +MetadataNode node, +String[] path +) { Review Comment: Like above, I'll add a check and helpful exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1187899780 ## shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java: ## @@ -80,28 +84,28 @@ public FindCommandHandler(List paths) { } @Override -public void run(Optional shell, -PrintWriter writer, -MetadataNodeManager manager) throws Exception { +public void run( +Optional shell, +PrintWriter writer, +MetadataShellState state +) throws Exception { for (String path : CommandUtils.getEffectivePaths(paths)) { -manager.visit(new GlobVisitor(path, entryOption -> { +new GlobVisitor(path, entryOption -> { if (entryOption.isPresent()) { find(writer, path, entryOption.get().node()); } else { writer.println("find: " + path + ": no such file or directory."); } -})); +}).accept(state); } } private void find(PrintWriter writer, String path, MetadataNode node) { writer.println(path); -if (node instanceof DirectoryNode) { -DirectoryNode directory = (DirectoryNode) node; -for (Entry entry : directory.children().entrySet()) { -String nextPath = path.equals("/") ? -path + entry.getKey() : path + "/" + entry.getKey(); -find(writer, nextPath, entry.getValue()); +if (node.isDirectory()) { Review Comment: That would be a logic error. I will add a check and RuntimeException to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1187898816 ## shell/src/main/java/org/apache/kafka/shell/state/MetadataShellState.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell.state; + +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.node.MetadataNode; +import org.apache.kafka.shell.node.RootShellNode; + +import java.util.function.Consumer; + +/** + * The Kafka metadata shell. Review Comment: good point, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1187898447 ## metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node.printer; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.metadata.KafkaConfigSchema; + + +public interface MetadataNodeRedactionCriteria { +/** + * Returns true if SCRAM data should be redacted. + */ +boolean shouldRedactScram(); + +/** + * Returns true if a configuration should be redacted. + * + * @param type The configuration type. + * @param key The configuration key. + * + * @return True if the configuration should be redacted. + */ +boolean shouldRedactConfig(ConfigResource.Type type, String key); + +class Strict implements MetadataNodeRedactionCriteria { +public static final Strict INSTANCE = new Strict(); + +@Override +public boolean shouldRedactScram() { +return true; +} + +@Override +public boolean shouldRedactConfig(ConfigResource.Type type, String key) { +return true; +} +} + +class Normal implements MetadataNodeRedactionCriteria { Review Comment: right, this one redacts only what needs to be redacted. annoyingly, we can't use it for toString() since that isn't called in a context that has access to it. So we'll just have to be strict with toString -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1187897908 ## shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell.node.printer; + +import org.apache.kafka.image.node.printer.MetadataNodePrinter; +import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria; + +import java.io.PrintWriter; + + +/** + * The Kafka metadata shell. + */ +public class ShellNodePrinter implements MetadataNodePrinter { +private final PrintWriter writer; +private int indentationLevel; + +public ShellNodePrinter(PrintWriter writer) { +this.writer = writer; +} + +String indentationString() { +StringBuilder bld = new StringBuilder(); +for (int i = 0; i < indentationLevel; i++) { +for (int j = 0; j < 2; j++) { +bld.append(" "); +} +} +return bld.toString(); +} + +@Override +public MetadataNodeRedactionCriteria redactionCriteria() { +return MetadataNodeRedactionCriteria.Disabled.INSTANCE; Review Comment: yes, I think that makes sense for the shell since it is targetted at operators after all. the main concern in other cases is the controller / broker daemon sending stuff to log4j that shouldn't be there. very different than the shell use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13682: MINOR: improved exception/warn logging for stream-stream join store settings
mjsax commented on code in PR #13682: URL: https://github.com/apache/kafka/pull/13682#discussion_r1187897510 ## streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java: ## @@ -115,7 +115,7 @@ public boolean persistent() { @Override public boolean isOpen() { -return inner.persistent(); +return inner.isOpen(); Review Comment: Can we extract this and cherry-pick to `3.5` before the release goes out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers
[ https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720672#comment-17720672 ] Colin McCabe commented on KAFKA-14918: -- This is fixed, sorry for the delay in resolving. > KRaft controller sending ZK controller RPCs to KRaft brokers > > > Key: KAFKA-14918 > URL: https://issues.apache.org/jira/browse/KAFKA-14918 > Project: Kafka > Issue Type: Sub-task >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.5.0 > > > During the migration, when upgrading a ZK broker to KRaft, the controller is > incorrectly sending UpdateMetadata requests to the KRaft controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14918) KRaft controller sending ZK controller RPCs to KRaft brokers
[ https://issues.apache.org/jira/browse/KAFKA-14918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-14918. -- Resolution: Fixed > KRaft controller sending ZK controller RPCs to KRaft brokers > > > Key: KAFKA-14918 > URL: https://issues.apache.org/jira/browse/KAFKA-14918 > Project: Kafka > Issue Type: Sub-task >Reporter: David Arthur >Assignee: David Arthur >Priority: Critical > Fix For: 3.5.0 > > > During the migration, when upgrading a ZK broker to KRaft, the controller is > incorrectly sending UpdateMetadata requests to the KRaft controller. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14698) Received request api key LEADER_AND_ISR which is not enabled
[ https://issues.apache.org/jira/browse/KAFKA-14698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-14698. -- Fix Version/s: (was: 3.4.1) Resolution: Duplicate > Received request api key LEADER_AND_ISR which is not enabled > > > Key: KAFKA-14698 > URL: https://issues.apache.org/jira/browse/KAFKA-14698 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.4.0 >Reporter: Mickael Maison >Assignee: Akhilesh Chaganti >Priority: Major > Fix For: 3.5.0 > > Attachments: broker0.log, controller.log, test_online_migration.tar.gz > > > I started from a Kafka cluster (with ZooKeeper) with 2 brokers. There's a > single topic "test" with 2 partitions and 2 replicas and the internal > __consumer_offsets topics. > While following the ZooKeeper to KRaft migration steps from > [https://kafka.apache.org/documentation/#kraft_zk_migration], I'm hitting > issues at the Migrating brokers to KRaft step. > When I restart a broker as KRaft, it repetitively prints the following error: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key LEADER_AND_ISR which is not enabled > [2023-02-09 16:14:30,334] ERROR Closing socket for > 192.168.1.11:9092-192.168.1.11:63737-371 because of error > (kafka.network.Processor) > {code} > The controller repetitively prints the following error: > {code:java} > [2023-02-09 16:12:27,456] WARN [Controller id=1000, targetBrokerId=0] > Connection to node 0 (mmaison-mac.home/192.168.1.11:9092) could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,456] INFO [Controller id=1000, targetBrokerId=0] Client > requested connection close from node 0 > (org.apache.kafka.clients.NetworkClient) > [2023-02-09 16:12:27,560] INFO [Controller id=1000, targetBrokerId=0] Node 0 > disconnected. (org.apache.kafka.clients.NetworkClient) > {code} > Attached the controller logs and logs from broker-0 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
mumrah commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1187879810 ## shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.shell.node.printer; + +import org.apache.kafka.image.node.printer.MetadataNodePrinter; +import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria; + +import java.io.PrintWriter; + + +/** + * The Kafka metadata shell. + */ +public class ShellNodePrinter implements MetadataNodePrinter { +private final PrintWriter writer; +private int indentationLevel; + +public ShellNodePrinter(PrintWriter writer) { +this.writer = writer; +} + +String indentationString() { +StringBuilder bld = new StringBuilder(); +for (int i = 0; i < indentationLevel; i++) { +for (int j = 0; j < 2; j++) { +bld.append(" "); +} +} +return bld.toString(); +} + +@Override +public MetadataNodeRedactionCriteria redactionCriteria() { +return MetadataNodeRedactionCriteria.Disabled.INSTANCE; Review Comment: Ok, so while using the shell, we won't redact anything? Is the this true for both interactive and non-interactive usages? ## metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java: ## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node.printer; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.metadata.KafkaConfigSchema; + + +public interface MetadataNodeRedactionCriteria { +/** + * Returns true if SCRAM data should be redacted. + */ +boolean shouldRedactScram(); + +/** + * Returns true if a configuration should be redacted. + * + * @param type The configuration type. + * @param key The configuration key. + * + * @return True if the configuration should be redacted. + */ +boolean shouldRedactConfig(ConfigResource.Type type, String key); + +class Strict implements MetadataNodeRedactionCriteria { +public static final Strict INSTANCE = new Strict(); + +@Override +public boolean shouldRedactScram() { +return true; +} + +@Override +public boolean shouldRedactConfig(ConfigResource.Type type, String key) { +return true; +} +} + +class Normal implements MetadataNodeRedactionCriteria { Review Comment: Since this one requires KafkaConfigSchema, I'm guessing it's for use by QuorumController? ## shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java: ## @@ -80,28 +84,28 @@ public FindCommandHandler(List paths) { } @Override -public void run(Optional shell, -PrintWriter writer, -MetadataNodeManager manager) throws Exception { +public void run( +Optional shell, +PrintWriter writer, +MetadataShellState state +) throws Exception { for (String path : CommandUtils.getEffectivePaths(paths)) { -manager.visit(new GlobVisitor(path, entryOption -> { +new GlobVisitor(path, entryOption -> { if (entryOption.isPresent()) { find(w
[jira] [Created] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
A. Sophie Blee-Goldman created KAFKA-14976: -- Summary: Left/outer stream-stream joins create KV stores that aren't customizable Key: KAFKA-14976 URL: https://issues.apache.org/jira/browse/KAFKA-14976 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman It appears that we only give the illusion of full customizability when it comes to the state stores of a windowed join. This arose due to an [optimization|https://github.com/apache/kafka/pull/11252] for the performance of the spurious results fix, and means that these joins now come with one additional, and possibly unexpected, state store: {code:java} final StoreBuilder, LeftOrRightValue>> builder = new ListValueStoreBuilder<>( |--[ persistent ? this--> | Stores.persistentKeyValueStore(storeName) : |--[ Stores.inMemoryKeyValueStore(storeName), timestampedKeyAndJoinSideSerde, leftOrRightValueSerde, Time.SYSTEM ); {code} where persistent is defined above that as {code:java} final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} This means regardless of whether a custom state store implementation was passed in to the join, we will still insert one of our RocksDB or InMemory state stores. Which might be very surprising since the API makes it seem like the underlying stores are fully configurable. I'm adding a warning line for this in PR [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] but we should really make this hidden state store fully configurable like the window stores currently are (which will require a KIP) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14976: --- Labels: needs-kip (was: ) > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mehbey commented on pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
mehbey commented on PR #13681: URL: https://github.com/apache/kafka/pull/13681#issuecomment-1538991259 Addressed Divij's comment and re-based with the latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vepo commented on pull request #12809: KAFKA-14324: Upgrade RocksDB to 7.1.2
vepo commented on PR #12809: URL: https://github.com/apache/kafka/pull/12809#issuecomment-1538926727 Is there any fix for this memory leak? How can we reproduce this memory leak for testing? Some of our environment is facing a memory leak but we cannot reproduce it, as we use **_org.rocksdb:rocksdbjni_** 7.1.2, I suppose this is the root cause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac merged PR #13638: URL: https://github.com/apache/kafka/pull/13638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Scanteianu commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
Scanteianu commented on PR #13455: URL: https://github.com/apache/kafka/pull/13455#issuecomment-1538836327 > Thanks @vvcephei - Think this is the failure. ZK related so I think the PR is good to go. > > `Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 51s ` Thanks @vvcephei @philipnee! The ci failures for the various builds have all seemed independent of this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing
mdedetrich commented on code in PR #13689: URL: https://github.com/apache/kafka/pull/13689#discussion_r1187745268 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ## @@ -76,6 +78,11 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana private final boolean startConsumerThread; private Thread initializationThread; + +private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L; + +private CountDownLatch initializeLatch; Review Comment: Thanks for the tip! Definitely can do this, I was just mimicking the style in the rest of the Kafka codebase. Will see what the reviewers respond with. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on PR #13608: URL: https://github.com/apache/kafka/pull/13608#issuecomment-1538787228 Thanks @hachikuji! Working on the tests as well, so I will push those when they are ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1187719600 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } - def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized { -val entry = producerStateManager.activeProducers.get(producerId) -entry != null && entry.currentTxnFirstOffset.isPresent + def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, baseSequence: Int): Boolean = lock synchronized { +val entry = producerStateManager.entryForVerification(producerId, producerEpoch, baseSequence) +(!entry.currentTxnFirstOffset.isPresent) && + (entry.compareAndSetVerificationState(producerEpoch, ProducerStateEntry.VerificationState.EMPTY, ProducerStateEntry.VerificationState.VERIFYING) || +entry.verificationState() == ProducerStateEntry.VerificationState.VERIFYING) + } + + def compareAndSetVerificationState(producerId: Long, + producerEpoch: Short, + baseSequence: Int, + expectedVerificationState: ProducerStateEntry.VerificationState, + newVerificationState: ProducerStateEntry.VerificationState): Unit = { lock synchronized { Review Comment: That's the method braces. Should I just make a new line so it is clearer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue closed pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate URL: https://github.com/apache/kafka/pull/13640 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #13640: KAFKA-14937: Refactoring for client code to reduce boilerplate
kirktrue commented on PR #13640: URL: https://github.com/apache/kafka/pull/13640#issuecomment-1538766507 Closing pull request and moving to a fork where we're doing integration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13608: KAFKA-14884: Include check transaction is still ongoing right before append
hachikuji commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1187662801 ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { return false; } } + +public void maybeUpdateTentaitiveSequence(int sequence) { Review Comment: typo: Tentaitive ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -103,28 +121,44 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { return false; } } + +public void maybeUpdateTentaitiveSequence(int sequence) { +if (batchMetadata.isEmpty() && (!this.tentativeSequence.isPresent() || this.tentativeSequence.getAsInt() > sequence)) +this.tentativeSequence = OptionalInt.of(sequence); +} private void addBatchMetadata(BatchMetadata batch) { +// When appending a batch, we no longer need tentative sequence. +this.tentativeSequence = OptionalInt.empty(); if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.removeFirst(); batchMetadata.add(batch); } + +public boolean compareAndSetVerificationState(short expectedProducerEpoch, VerificationState expectedVerificationState, VerificationState newVerificationState) { +if (expectedProducerEpoch == this.producerEpoch && verificationState == expectedVerificationState) { +this.verificationState = newVerificationState; +return true; +} +return false; +} public void update(ProducerStateEntry nextEntry) { -update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset); +update(nextEntry.producerEpoch, nextEntry.coordinatorEpoch, nextEntry.lastTimestamp, nextEntry.batchMetadata, nextEntry.currentTxnFirstOffset, nextEntry.verificationState); } public void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp) { -update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty()); +update(producerEpoch, coordinatorEpoch, lastTimestamp, new ArrayDeque<>(0), OptionalLong.empty(), VerificationState.EMPTY); } private void update(short producerEpoch, int coordinatorEpoch, long lastTimestamp, Deque batchMetadata, -OptionalLong currentTxnFirstOffset) { +OptionalLong currentTxnFirstOffset, VerificationState verificationState) { Review Comment: nit: when arg lists go above 2 or 3, it's helpful to start putting each argument on a separate line: ```java private void update( short producerEpoch, int coordinatorEpoch, long lastTimestamp, ... ) { ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ## @@ -41,18 +42,35 @@ public class ProducerStateEntry { private int coordinatorEpoch; private long lastTimestamp; private OptionalLong currentTxnFirstOffset; + +private VerificationState verificationState; + +// Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen. +private OptionalInt tentativeSequence; + +public enum VerificationState { +EMPTY, +VERIFYING, +VERIFIED +} public static ProducerStateEntry empty(long producerId) { -return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty()); +return new ProducerStateEntry(producerId, RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty(), Optional.empty(), VerificationState.EMPTY, OptionalInt.empty()); +} + +public static ProducerStateEntry forVerification(long producerId, short producerEpoch, long milliseconds) { Review Comment: nit: `milliseconds` -> `lastTimestamp`? ## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java: ## @@ -183,6 +184,19 @@ private void clearProducerIds() { producers.clear(); producerIdCount = 0; } + +public ProducerStateEntry entryForVerification(long producerId, short producerEpoch, int firstSequence) { +ProducerStateEntry entry; +if (producers.containsKey(producerId)) { Review Comment: nit: usually we would call `get` and check for null (saves one hash lookup) ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -579,9 +579,33 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } - def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized { -
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1187640544 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The epoch of the member when the state was last updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: Review Comment: makes sense thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test
philipnee commented on PR #13664: URL: https://github.com/apache/kafka/pull/13664#issuecomment-1538666147 Thanks @vvcephei - I think this is a flaky test? `Build / JDK 17 and Scala 2.13 / testDescribeStateOfExistingGroupWithRoundRobinAssignor() – kafka.admin.DescribeConsumerGroupTest 2s Skipped - 123` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13455: KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer
philipnee commented on PR #13455: URL: https://github.com/apache/kafka/pull/13455#issuecomment-1538664591 Thanks @vvcephei - Think this is the failure. ZK related so I think the PR is good to go. `Build / JDK 17 and Scala 2.13 / [1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest 1m 51s ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1187624902 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The epoch of the member when the state was last updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: Review Comment: Updated the comment. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog
yashmayya commented on PR #13688: URL: https://github.com/apache/kafka/pull/13688#issuecomment-1538658594 Thanks for taking a look @rhauch and your understanding here is correct. We should try to backport this to `3.3`, `3.4` and `3.5` as well (before the `3.5.0` release ideally, if possible). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mehbey commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
mehbey commented on code in PR #13681: URL: https://github.com/apache/kafka/pull/13681#discussion_r1187611572 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2579,7 +2563,9 @@ public void shouldUpdateInputPartitionsAfterRebalance() { assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertEquals(newPartitionsSet, task00.inputPartitions()); -verify(activeTaskCreator, consumer, changeLogReader); +verify(consumer, changeLogReader); +Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(taskId00Assignment)); Review Comment: yeah good recommendation, I will push a new version with the update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720566#comment-17720566 ] Yash Mayya commented on KAFKA-14974: Thanks [~rhauch], your understanding here is correct. We should backport [this fix|https://github.com/apache/kafka/pull/13688] to {{{}3.3{}}}, {{3.4}} and {{3.5}} as well (before the {{3.5.0}} release ideally, if possible). > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog
yashmayya commented on PR #13688: URL: https://github.com/apache/kafka/pull/13688#issuecomment-1538608185 Yep, that's right. We should backport this to `3.3`, `3.4` and `3.5` as well (before the `3.5.0` release ideally, if possible). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] reta commented on a diff in pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing
reta commented on code in PR #13689: URL: https://github.com/apache/kafka/pull/13689#discussion_r1187596223 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ## @@ -76,6 +78,11 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana private final boolean startConsumerThread; private Thread initializationThread; + +private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L; + +private CountDownLatch initializeLatch; Review Comment: Just a suggestion, but I think you could simplify tracking the initialization flow by folding `initializeLatch` & `initialized` into a `CompletableFuture` (which could also handle the case when close is called while the initialization is still in progress). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720553#comment-17720553 ] Randall Hauch commented on KAFKA-14974: --- [~yash.mayya], just to clarify: # #12984 changed the signatures of the then-`send(...)` methods by adding a return, which breaks backward compatibility for this utility class. # Those changes were made on `trunk` prior to the `3.5` branch ([it's in the `3.5` history](https://github.com/apache/kafka/commits/3.5?after=f9730c11b7b48a37f527a363e0c6dced53fdbc69+314&branch=3.5&qualified_name=refs%2Fheads%2F3.5)), and backported to the `3.4` and `3.3` branches # To restore backward compatibility, this PR renames those methods that return a `Future` as `sendWithReceipt(...)` and adds back the two `send(...)` methods that have the same signature as before Is this correct? > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
jolshan commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1187582387 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The epoch of the member when the state was last updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: Review Comment: I see. I guess it is a little confusing we have this as an all caps state, but don't list in in MemberState. Just wondering if it would be better to include a comment about the transient state when we transition or leave it out altogether -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14455) Kafka Connect create and update REST APIs should surface failures while writing to the config topic
[ https://issues.apache.org/jira/browse/KAFKA-14455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14455: --- Fix Version/s: 3.5.0 3.4.1 3.3.3 > Kafka Connect create and update REST APIs should surface failures while > writing to the config topic > --- > > Key: KAFKA-14455 > URL: https://issues.apache.org/jira/browse/KAFKA-14455 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Kafka Connect's `POST /connectors` and `PUT /connectors/\{connector}/config` > REST APIs internally simply write a message to the Connect cluster's internal > config topic (which is then processed asynchronously by the herder). However, > no callback is passed to the producer's send method and there is no error > handling in place for producer send failures (see > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716] > / > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726]). > Consider one such case where the Connect worker's principal doesn't have a > WRITE ACL on the cluster's config topic. Now suppose the user submits a > connector's configs via one of the above two APIs. The producer send > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L716] > / > [here|https://github.com/apache/kafka/blob/c1a54671e8fc6c7daec5f5ec3d8c934be96b4989/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L726] > won't succeed (due to a TopicAuthorizationException) but the API responses > will be `201 Created` success responses anyway. This is a very poor UX > because the connector will actually never be created but the API response > indicated success. Furthermore, this failure would only be detectable if > TRACE logs are enabled (via [this > log)|https://github.com/apache/kafka/blob/df29b17fc40f7c15460988d58bc652c3d66b60f8/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java] > making it near impossible for users to debug. Producer callbacks should be > used to surface write failures back to the user via the API response. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14974: --- Affects Version/s: 3.5.0 3.4.1 3.3.3 > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.5.0, 3.4.1, 3.3.3 >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-14974: -- Component/s: KafkaConnect > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mdedetrich commented on a diff in pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing
mdedetrich commented on code in PR #13689: URL: https://github.com/apache/kafka/pull/13689#discussion_r1187572164 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java: ## @@ -104,27 +103,10 @@ public void onPartitionLeadershipChanges(Set leaderPartitions, log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); topicBasedRemoteLogMetadataManager.configure(configs); -try { -waitUntilInitialized(60_000); -} catch (TimeoutException e) { -throw new KafkaException(e); -} - topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet()); } // Visible for testing. -public void waitUntilInitialized(long waitTimeMs) throws TimeoutException { Review Comment: This was removed because the entire point of this PR is to make explicit waiting redundant. The fact that this was only possible in a test because of package private is the underlying reason behind this change, in other words when running `TopicBasedRemoteLogMetadataManager` normally within a broker it was too easy to call methods before the`TopicBasedRemoteLogMetadataManager` was finished initializing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on pull request #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing
mdedetrich commented on PR #13689: URL: https://github.com/apache/kafka/pull/13689#issuecomment-1538539482 @satishd @junrao Pinging you because you done previous PR's regarding Tiered Storage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich opened a new pull request, #13689: KAFKA-14975: Wait for TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed to initialize rather than throwing
mdedetrich opened a new pull request, #13689: URL: https://github.com/apache/kafka/pull/13689 Current implementation of TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed will immediately throw an exception if TopicBasedRemoteLogMetadataManager is not initialized. Rather we should instead try and wait for TopicBasedRemoteLogMetadataManager to initialize with a timeout as this is expected behaviour of users. The solution to this is to use a typical `CountDownLatch` as is done elsewhere within Kafka ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
[ https://issues.apache.org/jira/browse/KAFKA-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720546#comment-17720546 ] Randall Hauch commented on KAFKA-14974: --- Thanks for catching this and providing a fix. Indeed, we have tried to maintain backward compatibility for this class since it is super useful. > Restore backward compatibility in KafkaBasedLog > --- > > Key: KAFKA-14974 > URL: https://issues.apache.org/jira/browse/KAFKA-14974 > Project: Kafka > Issue Type: Task >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Major > > {{KafkaBasedLog}} is a widely used utility class that provides a generic > implementation of a shared, compacted log of records in a Kafka topic. It > isn't in Connect's public API, but has been used outside of Connect and we > try to preserve backward compatibility whenever possible. > https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded > void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this > change is source compatible, it isn't binary compatible. We can restore > backward compatibility simply by re-instating the older send methods, and > renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog
yashmayya commented on PR #13688: URL: https://github.com/apache/kafka/pull/13688#issuecomment-1538509105 @rhauch could you please take a look whenever you get a chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya opened a new pull request, #13688: KAFKA-14974: Restore backward compatibility in KafkaBasedLog
yashmayya opened a new pull request, #13688: URL: https://github.com/apache/kafka/pull/13688 From https://issues.apache.org/jira/browse/KAFKA-14974: > `KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply by re-instating the older send methods, and renaming the new Future returning send methods. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete
[ https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich updated KAFKA-14975: --- Description: In the current implementation of TopicBasedRemoteLogMetadataManager various methods internally call the ensureInitializedAndNotClosed to ensure that the TopicBasedRemoteLogMetadataManager is initialized. If TopicBasedRemoteLogMetadataManager is not initialized then an exception will be thrown. This is not an ideal behaviour, rather than just throwing an exception we should instead try to wait until TopicBasedRemoteLogMetadataManager is initialised (with a timeout). This is what the expected behaviour from users should be and its also what other parts of Kafka that use plugin based systems (ergo kafka connect) do. was:In the current implementation of > Make TopicBasedRemoteLogMetadataManager methods wait for initialize to > complete > --- > > Key: KAFKA-14975 > URL: https://issues.apache.org/jira/browse/KAFKA-14975 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Major > > In the current implementation of TopicBasedRemoteLogMetadataManager various > methods internally call the > ensureInitializedAndNotClosed to ensure that the > TopicBasedRemoteLogMetadataManager is initialized. If > TopicBasedRemoteLogMetadataManager is not initialized then an exception will > be thrown. > This is not an ideal behaviour, rather than just throwing an exception we > should instead try to wait until TopicBasedRemoteLogMetadataManager is > initialised (with a timeout). This is what the expected behaviour from users > should be and its also what other parts of Kafka that use plugin based > systems (ergo kafka connect) do. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete
[ https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich reassigned KAFKA-14975: -- Assignee: Matthew de Detrich > Make TopicBasedRemoteLogMetadataManager methods wait for initialize to > complete > --- > > Key: KAFKA-14975 > URL: https://issues.apache.org/jira/browse/KAFKA-14975 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Major > > In the current implementation of -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete
[ https://issues.apache.org/jira/browse/KAFKA-14975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich updated KAFKA-14975: --- Description: In the current implementation of > Make TopicBasedRemoteLogMetadataManager methods wait for initialize to > complete > --- > > Key: KAFKA-14975 > URL: https://issues.apache.org/jira/browse/KAFKA-14975 > Project: Kafka > Issue Type: Task >Reporter: Matthew de Detrich >Priority: Major > > In the current implementation of -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14975) Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete
Matthew de Detrich created KAFKA-14975: -- Summary: Make TopicBasedRemoteLogMetadataManager methods wait for initialize to complete Key: KAFKA-14975 URL: https://issues.apache.org/jira/browse/KAFKA-14975 Project: Kafka Issue Type: Task Reporter: Matthew de Detrich -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] viktorsomogyi commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime
viktorsomogyi commented on PR #13594: URL: https://github.com/apache/kafka/pull/13594#issuecomment-1538474921 Merged it, thank you @vamossagar12 for the contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi merged pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime
viktorsomogyi merged PR #13594: URL: https://github.com/apache/kafka/pull/13594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14974) Restore backward compatibility in KafkaBasedLog
Yash Mayya created KAFKA-14974: -- Summary: Restore backward compatibility in KafkaBasedLog Key: KAFKA-14974 URL: https://issues.apache.org/jira/browse/KAFKA-14974 Project: Kafka Issue Type: Task Reporter: Yash Mayya Assignee: Yash Mayya {{KafkaBasedLog}} is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. https://issues.apache.org/jira/browse/KAFKA-14455 modified the two overloaded void {{KafkaBasedLog::send}} methods to return a {{{}Future{}}}. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply by re-instating the older send methods, and renaming the new Future returning send methods. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna merged pull request #13621: KAFKA-14133: Migrate ChangeLogReader mock in TaskManagerTest to Mockito
cadonna merged PR #13621: URL: https://github.com/apache/kafka/pull/13621 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13594: KAFKA-14913: Using ThreadUtils.shutdownExecutorServiceQuietly to close executors in Connect Runtime
vamossagar12 commented on PR #13594: URL: https://github.com/apache/kafka/pull/13594#issuecomment-1538450687 Thanks @viktorsomogyi . These are the ones we could find so far. We can create follow up tickets if needed for any other executors within connect which need to be closed this way and merge this. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13644: KAFKA-14500; [1/N] Rewrite MemberMetadata in Java
dajac commented on code in PR #13644: URL: https://github.com/apache/kafka/pull/13644#discussion_r1187462145 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java: ## @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.generic; + +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.SyncGroupResponseData; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * This class encapsulates a generic group member's metadata. + * + * Member metadata contains the following: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Protocol metadata: + * 1. the list of supported protocols (ordered by preference) + * 2. the metadata associated with each protocol + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * member has sent the join group request + * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback + *is kept in metadata until the leader provides the group assignment + *and the group transitions to stable + */ +public class GenericGroupMember { + +private static class MemberSummary { +private final String memberId; +private final Optional groupInstanceId; +private final String clientId; +private final String clientHost; +private final byte[] metadata; +private final byte[] assignment; + +public MemberSummary(String memberId, + Optional groupInstanceId, + String clientId, + String clientHost, + byte[] metadata, + byte[] assignment) { + +this.memberId = memberId; +this.groupInstanceId = groupInstanceId; +this.clientId = clientId; +this.clientHost = clientHost; +this.metadata = metadata; +this.assignment = assignment; +} + +public String memberId() { +return memberId; +} + +public Optional getGroupInstanceId() { +return groupInstanceId; +} + +public String clientId() { +return clientId; +} + +public String clientHost() { +return clientHost; +} + +public byte[] metadata() { +return metadata; +} + +public byte[] assignment() { +return assignment; +} + +} + +/** + * The member id. + */ +private final String memberId; + +/** + * The group instance id. + */ +private final Optional groupInstanceId; + +/** + * The client id. + */ +private final String clientId; Review Comment: It is better to refactor here. It is confusing otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on PR #13535: URL: https://github.com/apache/kafka/pull/13535#issuecomment-1538311434 Thanks @junrao for the updated review comments. Addressed the comments with inline replies or with the latest commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1187412666 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig, fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) }) } - val delayedFetch = new DelayedFetch( -params = params, -fetchPartitionStatus = fetchPartitionStatus, -replicaManager = this, -quota = quota, -responseCallback = responseCallback - ) - - // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation - val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } - - // try to complete the request immediately, otherwise put it into the purgatory; - // this is because while the delayed fetch operation is being created, new requests - // may arrive and hence make this operation completable. - delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) + + if (remoteFetchInfo.isPresent) { Review Comment: Do you mean to say that we should not return immediately if `remoteFetchInfo` exists because that should be served otherwise remote fetches may starve as long as there is enough data immediately available to be sent? So, the condition becomes ``` if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13684: MINOR: fix compilation failure
showuon commented on PR #13684: URL: https://github.com/apache/kafka/pull/13684#issuecomment-1538178676 It's fine sine it's for 3.4 branch, not for trunk branch. Thanks @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14973) Inconsistent treatment of invalid config when creating or altering a topic
James Shaw created KAFKA-14973: -- Summary: Inconsistent treatment of invalid config when creating or altering a topic Key: KAFKA-14973 URL: https://issues.apache.org/jira/browse/KAFKA-14973 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 3.4.0 Reporter: James Shaw {{createTopics}} throws {{InvalidConfigurationException}} on receiving an invalid config entry name or value. {{incrementalAlterConfigs}} throws {{InvalidConfigurationException}} on receiving an invalid config entry name, but throws {{InvalidRequestException}} on receiving an invalid entry value. The {{incrementalAlterConfigs}} javadoc mentions that {{InvalidRequestException}} is anticipated; the {{createTopics}} javadoc says nothing about exception types. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on pull request #13684: MINOR: fix compilation failure
showuon commented on PR #13684: URL: https://github.com/apache/kafka/pull/13684#issuecomment-1538085446 @mimaison , call for review to fix the build in v3.4 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13684: MINOR: fix compilation failure
showuon commented on PR #13684: URL: https://github.com/apache/kafka/pull/13684#issuecomment-1538084603 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.clients.consumer.internals.EagerConsumerCoordinatorTest.testPrepareJoinAndRejoinAfterFailedRebalance() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic Build / JDK 8 and Scala 2.12 / kafka.security.authorizer.AuthorizerTest.testDeleteAclOnWildcardResource(String).quorum=kraft Build / JDK 11 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testAllowAllAccess(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplication() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicationWithEmptyPartition() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorStop Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInConnectorConfig Build / JDK 17 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testTopicAcl(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-CoReside, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Distributed, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Distributed, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV0, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
divijvaidya commented on code in PR #13681: URL: https://github.com/apache/kafka/pull/13681#discussion_r1187180990 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -291,16 +292,16 @@ public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle)); -expect(activeTaskCreator.createTasks(consumer, Collections.emptyMap())).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); -replay(activeTaskCreator, standbyTaskCreator); +replay(standbyTaskCreator); Review Comment: Mockito doesn't require to call `replay` (unlike EasyMock) (same comment for rest of the places in this file where we are using replay) ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -291,16 +292,16 @@ public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle)); -expect(activeTaskCreator.createTasks(consumer, Collections.emptyMap())).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); -replay(activeTaskCreator, standbyTaskCreator); +replay(standbyTaskCreator); taskManager.handleAssignment( Collections.emptyMap(), mkMap(mkEntry(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions())) ); -verify(activeTaskCreator, standbyTaskCreator); +verify(standbyTaskCreator); Review Comment: We probably want to verify a method invocation here. If you use Mockito's verify() here instead of EasyMock, you might see a compilation error. (same comment for rest of the usage of verify(mock) in this PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
divijvaidya commented on code in PR #13681: URL: https://github.com/apache/kafka/pull/13681#discussion_r1187177289 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## Review Comment: we still seem to have easymock imports. Can you please remove them and replace with Mockito as well? ``` import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.resetToStrict; import static org.easymock.EasyMock.verify; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
divijvaidya commented on code in PR #13681: URL: https://github.com/apache/kafka/pull/13681#discussion_r1187177289 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## Review Comment: we still seem to have easymock imports. Can you please remove them and replace with Mockito as well? ``` import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.resetToStrict; import static org.easymock.EasyMock.verify; ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -291,16 +292,16 @@ public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle)); -expect(activeTaskCreator.createTasks(consumer, Collections.emptyMap())).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); -replay(activeTaskCreator, standbyTaskCreator); +replay(standbyTaskCreator); Review Comment: Mockito doesn't require to call `replay` (unlike EasyMock) (same comment for rest of the places in this file where we are using replay) ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -291,16 +292,16 @@ public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToRecycle)); -expect(activeTaskCreator.createTasks(consumer, Collections.emptyMap())).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); -replay(activeTaskCreator, standbyTaskCreator); +replay(standbyTaskCreator); taskManager.handleAssignment( Collections.emptyMap(), mkMap(mkEntry(activeTaskToRecycle.id(), activeTaskToRecycle.inputPartitions())) ); -verify(activeTaskCreator, standbyTaskCreator); +verify(standbyTaskCreator); Review Comment: We probably want to verify a method invocation here. If you use Mockito's verify() here instead of EasyMock, you might see a compilation error. (same comment for rest of the usage of verify(mock) in this PR) ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -2579,7 +2563,9 @@ public void shouldUpdateInputPartitionsAfterRebalance() { assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertEquals(newPartitionsSet, task00.inputPartitions()); -verify(activeTaskCreator, consumer, changeLogReader); +verify(consumer, changeLogReader); +Mockito.verify(activeTaskCreator).createTasks(any(), Mockito.eq(taskId00Assignment)); Review Comment: Mockito has a mode called STRICT_STUBS which fails the test if a defined stub is not invoked. We can greatly simplify code using that annotation since we don't have to do both `when()` and `verify()`. Using `when()` would suffice since the test will fail if the stub is not used (or using verify() would suffice for cases with void return). We use STRICT_STUBS in a bunch of places in Kafka code such as [this](https://github.com/apache/kafka/blob/347238948b86882a47faee4a2916d1b01333d95f/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java#L60). Please consider using it in this file as it will greatly remove boilerplate code verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) Review Comment: I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances have not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls. Does this make sense or do I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) Review Comment: I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances has not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls. Does this make sense or do I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1187159604 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'B' +extra_properties['test.expected_agg_values'] = 'A,B' +for p in self.processors[:-1]: +self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) Review Comment: Ah, I misinterpreted the code. I thought, the whole list of from_versions is passed into the function. Now I see that it is just one version, obviously. My fault, sorry! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13687: MINOR: test
showuon opened a new pull request, #13687: URL: https://github.com/apache/kafka/pull/13687 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage
cmccabe opened a new pull request, #13686: URL: https://github.com/apache/kafka/pull/13686 Metadata image classes such as MetadataImage, ClusterImage, FeaturesImage, and so forth contain numerous sub-images. This PR adds a structured way of traversing those sub-images. This is useful for the metadata shell, and also for implementing toString functions. In both cases, the previous solution was suboptimal. The metadata shell was previously implemented in an ad-hoc way by mutating text-based tree nodes when records were replayed. This was difficult to keep in sync with changes to the record types (for example, we forgot to do this for SCRAM). It was also pretty low-level, being done at a level below that of the image classes. For toString, it was difficult to keep the implementations consistent previously, and also support both redacted and non-redacted output. The metadata shell directory was getting crowded since we never had submodules for it. This PR creates glob/, command/, node/, and state/ directories to keep things better organized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #13685: test
showuon closed pull request #13685: test URL: https://github.com/apache/kafka/pull/13685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13685: test
showuon opened a new pull request, #13685: URL: https://github.com/apache/kafka/pull/13685 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13684: MINOR: fix compilation failure
showuon opened a new pull request, #13684: URL: https://github.com/apache/kafka/pull/13684 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org