[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name
hachikuji commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r670046323 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); +// If both topic IDs were valid and the topic ID changed, update the metadata Review comment: nit: move this comment into the `if` ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); +// If both topic IDs were valid and the topic ID changed, update the metadata +} else if (!topicId.equals(Uuid.ZERO_UUID) && oldTopicId != null && !topicId.equals(oldTopicId)) { Review comment: Hmm, shouldn't this check come before the epoch check? Admittedly, it's unlikely that a recreated topic would have a higher epoch, but we may as well handle that case. By the way, it's a little inconsistent that this check uses both null and `Uuid.ZERO_UUID` to represent a missing value. Maybe we can use null consistently? ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -216,6 +217,14 @@ public synchronized boolean updateRequested() { } } +public synchronized Uuid topicId(String topicName) { Review comment: Can you document that this returns null if the topicId does not exist or is not known? Similarly for `topicName`. ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); +assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); -metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); +metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); Review comment: nit: `assertNull` ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -874,18 +921,69 @@ protected boolean retainTopic(String topic, boolean isInternal, long nowMs) { assertEquals(cluster.topics(), new HashSet<>(Arrays.asList("keepValidTopic", "newValidTopic"))); assertEquals(cluster.partitionsForTopic("keepValidTopic").size(), 2); assertEquals(cluster.partitionsForTopic("newValidTopic").size(), 4); +assertTrue(cluster.topicIds().containsAll(topicIds.values())); // Perform another metadata update, but this time all topic metadata should be cleared. retainTopics.set(Collections.emptySet()); -metadataResponse = RequestTestUtils.metadataUpdateWith(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300); +metadataResponse = RequestTestUtils.metadataUpdateWithIds(newClusterId, newNodes, newTopicErrors, newTopicPartitionCounts, _tp -> 300, topicIds); metadata.updateWithCurrentRequestVersion(metadataResponse, true, time.milliseconds()); +topicIds.forEach((topicName, topicId) -> assertEquals(metadata.topicId(topicName), null)); cluster = metadata.fetch(); assertEquals(cluster.clusterResource().clusterId(), newClusterId); assertEquals(cluster.nodes().size(), newNodes); assertEquals(cluster.invalidTopics(), Collections.emptySet()); assertEquals(cluster.unauthorizedTopics(), Collections.emptySet()); assertEquals(cluster.topics(), Collections.emptySet()); +assertTrue(cluster.topicIds().isEmpty()); +} + +@Test +public void testMetadata
[jira] [Commented] (KAFKA-12896) Group rebalance loop caused by repeated group leader JoinGroups
[ https://issues.apache.org/jira/browse/KAFKA-12896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17381083#comment-17381083 ] David Jacot commented on KAFKA-12896: - [~ableegoldman] With you two fixes, I think that we can resolve this one, isn't it? > Group rebalance loop caused by repeated group leader JoinGroups > --- > > Key: KAFKA-12896 > URL: https://issues.apache.org/jira/browse/KAFKA-12896 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.6.0 >Reporter: Lucas Bradstreet >Assignee: David Jacot >Priority: Blocker > Fix For: 3.0.0 > > > We encountered a strange case of a rebalance loop with the > "cooperative-sticky" assignor. The logs show the following for several hours: > > {{Apr 7, 2021 @ 03:58:36.040 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830137 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.992 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830136 > (__consumer_offsets-7) (reason: Updating metadata for member > mygroup-1-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}} > {{Apr 7, 2021 @ 03:58:35.988 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830136 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.972 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830135 > (__consumer_offsets-7) (reason: Updating metadata for member mygroup during > CompletingRebalance)}} > {{Apr 7, 2021 @ 03:58:35.965 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830135 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.953 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830134 > (__consumer_offsets-7) (reason: Updating metadata for member > mygroup-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}} > {{Apr 7, 2021 @ 03:58:35.941 [GroupCoordinator 7]: Stabilized group mygroup > generation 19830134 (__consumer_offsets-7)}} > {{Apr 7, 2021 @ 03:58:35.926 [GroupCoordinator 7]: Preparing to rebalance > group mygroup in state PreparingRebalance with old generation 19830133 > (__consumer_offsets-7) (reason: Updating metadata for member mygroup during > CompletingRebalance)}} > Every single time, it was the same member that triggered the JoinGroup and it > was always the leader of the group.{{}} > The leader has the privilege of being able to trigger a rebalance by sending > `JoinGroup` even if its subscription metadata has not changed. But why would > it do so? > It is possible that this is due to the same issue or a similar bug to > https://issues.apache.org/jira/browse/KAFKA-12890. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd opened a new pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.
satishd opened a new pull request #11058: URL: https://github.com/apache/kafka/pull/11058 - Added snapshots for consumed remote log metadata for each partition to avoid consuming again in case of broker restarts. These snapshots are stored in the respective topic partition log directories. - Added TopicBasedRemoteLogMetadataManagerRestartTest: * loads the earlier saved snapshots after restart * checks the entries are available * starts the consumer and add more metadata entries * checks the newly added entries and loaded entries are available ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
guozhangwang commented on pull request #11057: URL: https://github.com/apache/kafka/pull/11057#issuecomment-880417140 Also cc @cmccabe @hachikuji . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
guozhangwang commented on pull request #11057: URL: https://github.com/apache/kafka/pull/11057#issuecomment-880416855 @vvcephei @ableegoldman @showuon LMK what do you think about this approach. I have not do the due diligence of test coverage yet, will do that if people are +1 on this direction. If we feel this is a general issue for normal consumers as well, then this approach would be better than just fixing it at the Streams layer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty
guozhangwang opened a new pull request #11057: URL: https://github.com/apache/kafka/pull/11057 This is an idea I had for attacking this on the consumer client level: 1. When listOffset result is retrieved inside Fetcher, check if the partitions are part of the subscriptions of the consumer; if yes update the corresponding LSO or HW based on the isolation level. 2. When partitionLag cannot return result since the log end offset (LSO/HW) is not known, send an async list offset which would be completed by other calls polling (also the hb thread may complete it as well), and hope the next partitionLag would get the result. Then on the streams side, the first partitionLag would still return empty, but soon enough the subsequent partitionLag should return data and we would not wait for the fetch response to update fetched state. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r670134315 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { +private final List replicas; +private final List isr; +private final boolean unclean; + +PartitionReassignmentRevert(PartitionRegistration registration) { +// Figure out the replica list and ISR that we will have after reverting the +// reassignment. In general, we want to take out any replica that the reassignment +// was adding, but keep the ones the reassignment was removing. (But see the +// special case below.) +Set adding = Replicas.toSet(registration.addingReplicas); +this.replicas = new ArrayList<>(registration.replicas.length); +this.isr = new ArrayList<>(registration.isr.length); +for (int i = 0; i < registration.isr.length; i++) { +int replica = registration.isr[i]; +if (!adding.contains(replica)) { +this.isr.add(replica); +} +} +for (int replica : registration.replicas) { +if (!adding.contains(replica)) { +this.replicas.add(replica); +} +} +if (isr.isEmpty()) { +// In the special case that all the replicas that are in the ISR are also +// contained in addingReplicas, we choose the first remaining replica and add +// it to the ISR. This is considered an unclean leader election. Therefore, +// calling code must check that unclean leader election is enabled before +// accepting the new ISR. +if (this.replicas.isEmpty()) { +// This should not be reachable, since it would require a partition +// starting with an empty replica set prior to the reassignment we are +// trying to revert. +throw new InvalidReplicaAssignmentException("Invalid replica " + +"assignment: addingReplicas contains all replicas."); +} +isr.add(replicas.get(0)); Review comment: It seems simpler to do it here because then we can reject the unclean case before creating the `PartitionChangeBuilder` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r670132470 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; + +/** + * PartitionChangeBuilder handles changing partition registrations. + */ +public class PartitionChangeBuilder { +public static boolean changeRecordIsNoOp(PartitionChangeRecord record) { +if (record.isr() != null) return false; +if (record.leader() != NO_LEADER_CHANGE) return false; +if (record.replicas() != null) return false; +if (record.removingReplicas() != null) return false; +if (record.addingReplicas() != null) return false; +return true; +} + +private final PartitionRegistration partition; +private final Uuid topicId; +private final int partitionId; +private final Function isAcceptableLeader; +private final Supplier uncleanElectionOk; +private List targetIsr; +private List targetReplicas; +private List targetRemoving; +private List targetAdding; +private boolean alwaysElectPreferredIfPossible; + +public PartitionChangeBuilder(PartitionRegistration partition, + Uuid topicId, + int partitionId, + Function isAcceptableLeader, + Supplier uncleanElectionOk) { +this.partition = partition; +this.topicId = topicId; +this.partitionId = partitionId; +this.isAcceptableLeader = isAcceptableLeader; +this.uncleanElectionOk = uncleanElectionOk; +this.targetIsr = Replicas.toList(partition.isr); +this.targetReplicas = Replicas.toList(partition.replicas); +this.targetRemoving = Replicas.toList(partition.removingReplicas); +this.targetAdding = Replicas.toList(partition.addingReplicas); +this.alwaysElectPreferredIfPossible = false; +} + +public PartitionChangeBuilder setTargetIsr(List targetIsr) { +this.targetIsr = targetIsr; +return this; +} + +public PartitionChangeBuilder setTargetReplicas(List targetReplicas) { +this.targetReplicas = targetReplicas; +return this; +} + +public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) { +this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible; +return this; +} + +public PartitionChangeBuilder setTargetRemoving(List targetRemoving) { +this.targetRemoving = targetRemoving; +return this; +} + +public PartitionChangeBuilder setTargetAdding(List targetAdding) { +this.targetAdding = targetAdding; +return this; +} + +boolean shouldTryElection() { +// If the new isr doesn't have the current leader, we need to try to elect a new +// one. Note: this also handles the case where the current leader is NO_LEADER, +// since that value cannot appear in targetIsr. +if (!targetIsr.contains(partition.leader)) return true; + +// Check if we want to try to get away from a non-preferred leader. +if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) return true; + +return false; +} + +class BestLeader { +final int node; +fina
[GitHub] [kafka] guozhangwang commented on a change in pull request #10941: KAFKA-10847: Remove internal config for enabling the fix
guozhangwang commented on a change in pull request #10941: URL: https://github.com/apache/kafka/pull/10941#discussion_r670119642 ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,21 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + +We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or stream-stream joins. +This period determines how long after a window ends any late arrived records will still be processed. +Records coming in after the grace period has elapsed will be dropped from those windows. +With a long grace period, though Kafka Streams can handle out-of-order data up to that amount of time, it will also incur a high and confusing latency for users, +e.g. suppression operators with the default won't emit results up for 24 hours, while lso in practice out-of-order data usually has a much smaller time-skewness. +Instead of abstracting this config from users with a long default value, we introduced new constructs such as TimeWindows#ofSizeWithNoGrace to let callers always set it upon constructing the windows; Review comment: `TimeDiffernce` is only in `JoinWindows`. ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,21 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + +We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or stream-stream joins. Review comment: Since we are piggy-backing the fix on KIP-663 now, I want to incorporate the change along with this PR. ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,21 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + +We dropped the default 24 hours grace period for windowed operations such as Window or Session aggregates, or stream-stream joins. +This period determines how long after a window ends any late arrived records will still be processed. +Records coming in after the grace period has elapsed will be dropped from those windows. +With a long grace period, though Kafka Streams can handle out-of-order data up to that amount of time, it will also incur a high and confusing latency for users, +e.g. suppression operators with the default won't emit results up for 24 hours, while lso in practice out-of-order data usually has a much smaller time-skewness. +Instead of abstracting this config from users with a long default value, we introduced new constructs such as TimeWindows#ofSizeWithNoGrace to let callers always set it upon constructing the windows; +the other setters such as TimeWindows#grace are deprecated and will be removed in the future. +Also when the new construct API are used for left/outer stream-stream joins, Kafka Streams would fix emitting spurious join results which may have an impact on the throughput. Review comment: Ack! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
guozhangwang commented on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-880375143 @cadonna and everyone: I've addressed the comments and add `JoinWindows`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
guozhangwang commented on a change in pull request #10953: URL: https://github.com/apache/kafka/pull/10953#discussion_r670103326 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java ## @@ -146,7 +146,7 @@ public static TimeWindows of(final Duration size) throws IllegalArgumentExceptio final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size"); final long sizeMs = validateMillisecondDuration(size, msgPrefix); -return new TimeWindows(sizeMs, sizeMs, DEPRECATED_OLD_24_HR_GRACE_PERIOD); +return new TimeWindows(sizeMs, sizeMs, Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - sizeMs, 0)); Review comment: Updated on all corresponding classes' constructors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests
showuon commented on a change in pull request #11019: URL: https://github.com/apache/kafka/pull/11019#discussion_r670111886 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java ## @@ -67,48 +69,88 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(t0p0, Errors.NONE); -assertCompleted(handleWithError(Errors.NONE), responseData); +assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { -assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); +assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); +assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { -assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); -assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test -public void testFailedHandleResponse() { -assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); -assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); -assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); +public void testFailedHandleResponseWithGroupError() { +assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); +assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); +assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); +assertGroupFailed(GroupNotEmptyException.class, handleWithGroupError(Errors.NON_EMPTY_GROUP)); } -private OffsetDeleteResponse buildResponse(Errors error) { +@Test +public void testFailedHandleResponseWithPartitionError() { +assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), +handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); +assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), +handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); +assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), +handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); +} + +private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { +OffsetDeleteResponse response = new OffsetDeleteResponse( +new OffsetDeleteResponseData() +.setErrorCode(error.code())); +if (error == Errors.NONE) { +response.data() +.setThrottleTimeMs(0) +.setTopics(new OffsetDeleteResponseTopicCollection(singletonList( +new OffsetDeleteResponseTopic() +.setName(t0p0.topic()) +.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( +new OffsetDeleteResponsePartition() +.setPartitionIndex(t0p0.partition()) +.setErrorCode(error.code()) +).iterator())) +).iterator())); +} +return response; +} + +private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( -new OffsetDeleteResponseData() -.setThrottleTimeMs(0) -.setTopics(new OffsetDeleteResponseTopicCollection(singletonList( -new OffsetDeleteResponseTopic() -.setName("t0") -.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( -new OffsetDeleteResponsePartition() -.setPartitionIndex(0) -.setErrorCode(error.code()) - ).iterator())) - ).iterator(; +new OffsetDeleteResponseData() +.setThrottleTimeMs(0) +.setTopics(new OffsetDeleteResponseTopicCollection(singletonList( +new OffsetDeleteResponseTopic() +.setName(t0p0.topic()) +.setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( +new OffsetDeleteResponsePartition() +
[GitHub] [kafka] jolshan opened a new pull request #11056: KAFKA-13092: Perf regression in LISR requests
jolshan opened a new pull request #11056: URL: https://github.com/apache/kafka/pull/11056 After noticing increased LISR times, we discovered a lot of time was spent synchronously flushing the partition metadata file. This PR changes the code so we asynchronously flush the files. We ensure files are flushed before appending, renaming or closing the log to ensure we have the partition metadata information on disk. Three new tests have been added to address these cases. Benchmark by @lbradstreet is included to compare the times. I will update this description after I compare trunk to this branch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11035: KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests
showuon commented on a change in pull request #11035: URL: https://github.com/apache/kafka/pull/11035#discussion_r670091468 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3680,6 +3685,72 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { } } +@Test +public void testRemoveMembersFromGroupRetriableErrorsInMemberResponse() throws Exception { +// Retriable errors should be retried +String groupId = "instance-1"; + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +env.kafkaClient().prepareResponse( +prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + +MemberResponse memberResponse = new MemberResponse() +.setGroupInstanceId(groupId) +.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); Review comment: Right! Removed. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yanspirit commented on pull request #11020: KAFKA-12937; mm2 can start from the ending of a topic
yanspirit commented on pull request #11020: URL: https://github.com/apache/kafka/pull/11020#issuecomment-880345058 > Not sure why I was tagged — did you mean somebody else? hello, the github recommend. there's no Assignees, I don't know how to trigger the flow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11035: KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests
showuon commented on a change in pull request #11035: URL: https://github.com/apache/kafka/pull/11035#discussion_r670079198 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ## @@ -79,55 +90,82 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); -final Errors error = Errors.forCode(response.data().errorCode()); +final Errors error = response.topLevelError(); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { +Errors memberError = Errors.forCode(memberResponse.errorCode()); +String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); +memberError); } completed.put(groupId, memberErrors); } -return new ApiResult<>(completed, failed, unmapped); + +if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { Review comment: Agree! Updated! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380951#comment-17380951 ] Dongjin Lee commented on KAFKA-9366: [~kkonstantine] Got it. > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.1.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests
showuon commented on a change in pull request #11022: URL: https://github.com/apache/kafka/pull/11022#discussion_r670075392 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -2688,8 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); -//Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); Review comment: Oh, you are right! reverted my change. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
ableegoldman commented on pull request #10877: URL: https://github.com/apache/kafka/pull/10877#issuecomment-880336482 Merged to trunk and cherrypicked to 3.0 and 2.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests
showuon commented on a change in pull request #11022: URL: https://github.com/apache/kafka/pull/11022#discussion_r670073886 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java ## @@ -151,38 +154,45 @@ public String apiName() { completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( -String.format("GroupId %s is not a consumer group (%s).", -groupIdKey.idValue, protocolType))); +String.format("GroupId %s is not a consumer group (%s).", +groupIdKey.idValue, protocolType))); } } -return new ApiResult<>(completed, failed, unmapped); + +return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } private void handleError( CoordinatorKey groupId, Errors error, Map failed, -List unmapped +Set groupsToUnmap, +Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: -log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId, -error.exception()); +log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: -case COORDINATOR_NOT_AVAILABLE: +// If the coordinator is in the middle of loading, then we just need to retry +log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + +"is still in the process of loading state. Will retry", groupId.idValue); +groupsToRetry.add(groupId); break; +case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: -log.debug("DescribeGroups request for group {} returned error {}. Will retry", -groupId, error); -unmapped.add(groupId); +// If the coordinator is unavailable or there was a coordinator change, then we unmap +// the key so that we retry the `FindCoordinator` request +log.debug("`DescribeGroups` request for group id {} returned error {}. " + +"Will attempt to find the coordinator again and retry", groupId.idValue, error); +groupsToUnmap.add(groupId); break; default: -log.error("Received unexpected error for group {} in `DescribeGroups` response", -groupId, error.exception()); -failed.put(groupId, error.exception( -"Received unexpected error for group " + groupId + " in `DescribeGroups` response")); +final String unexpectedErrorMsg = +String.format("`DescribeGroups` request for group id %s failed due to error %s", groupId.idValue, error); +log.error(unexpectedErrorMsg); +failed.put(groupId, error.exception(unexpectedErrorMsg)); Review comment: Agree. I just follow the previous behavior. Remove the error message. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests
showuon commented on a change in pull request #11022: URL: https://github.com/apache/kafka/pull/11022#discussion_r670073415 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java ## @@ -109,16 +109,19 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { -DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; -Map completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; +final Map completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); -for (DescribedGroup describedGroup : response.data().groups()) { +List describedGroups = response.data().groups(); + +for (DescribedGroup describedGroup : describedGroups) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); Errors error = Errors.forCode(describedGroup.errorCode()); if (error != Errors.NONE) { -handleError(groupIdKey, error, failed, unmapped); +handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry); Review comment: Good catch! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
[ https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380939#comment-17380939 ] A. Sophie Blee-Goldman commented on KAFKA-13010: Not the exact same test, but I did manage to reproduce this same "only one task" failure in TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation: {code:java} java.lang.AssertionError: only one task at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) at org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) {code} I saved the logs since they should actually be the full, un-truncated logs – hope this helps: [^TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf] > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation() > --- > > Key: KAFKA-13010 > URL: https://issues.apache.org/jira/browse/KAFKA-13010 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > Attachments: > TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13092) Perf regression in LISR requests
[ https://issues.apache.org/jira/browse/KAFKA-13092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-13092: -- Assignee: Justine Olshan > Perf regression in LISR requests > > > Key: KAFKA-13092 > URL: https://issues.apache.org/jira/browse/KAFKA-13092 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Critical > > With the addition of partition metadata files, we have an extra operation to > do when handling LISR requests. This really slows down the processing, so we > should flush asynchronously to fix this regression. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
[ https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13010: --- Attachment: TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation() > --- > > Key: KAFKA-13010 > URL: https://issues.apache.org/jira/browse/KAFKA-13010 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > Attachments: > TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13092) Perf regression in LISR requests
Justine Olshan created KAFKA-13092: -- Summary: Perf regression in LISR requests Key: KAFKA-13092 URL: https://issues.apache.org/jira/browse/KAFKA-13092 Project: Kafka Issue Type: Bug Reporter: Justine Olshan With the addition of partition metadata files, we have an extra operation to do when handling LISR requests. This really slows down the processing, so we should flush asynchronously to fix this regression. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon closed pull request #10973: KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped
showuon closed pull request #10973: URL: https://github.com/apache/kafka/pull/10973 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12970) Make tiered storage related schemas adopt flexible versions feature.
[ https://issues.apache.org/jira/browse/KAFKA-12970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved KAFKA-12970. Resolution: Fixed This is already addressed as mentioned in the comment: https://issues.apache.org/jira/browse/KAFKA-12970?focusedCommentId=17365231&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17365231 > Make tiered storage related schemas adopt flexible versions feature. > - > > Key: KAFKA-12970 > URL: https://issues.apache.org/jira/browse/KAFKA-12970 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13089) Revisit the usage of BufferSuppliers in Kraft
[ https://issues.apache.org/jira/browse/KAFKA-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380914#comment-17380914 ] Jose Armando Garcia Sancio commented on KAFKA-13089: {quote}Can each listener hold a thread confined buffer supplier? {quote} Yeah. That is one possible solution to this problem. Have the {{RaftClient.Listener}} provide a {{BufferSupplier}} during registration. > Revisit the usage of BufferSuppliers in Kraft > - > > Key: KAFKA-13089 > URL: https://issues.apache.org/jira/browse/KAFKA-13089 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > The latest KafkaRaftClient creates a new BufferSupplier every time it is > needed. A buffer supplier is needed when reading from the log and when > reading from a snapshot. > It would be good to investigate if there is a performance and memory usage > advantage of sharing the buffer supplier between those use cases and every > time the log or snapshot are read. > If BufferSupplier is share, it is very likely that the implementation will > have to be thread-safe because we need support multiple Listeners and each > Listener would be using a different thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr
Jason Gustafson created KAFKA-13091: --- Summary: Increment HW after shrinking ISR through AlterIsr Key: KAFKA-13091 URL: https://issues.apache.org/jira/browse/KAFKA-13091 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: David Arthur After we have shrunk the ISR, we have an opportunity to advance the high watermark. We do this currently in `maybeShrinkIsr` after the synchronous update through ZK. For the AlterIsr path, however, we cannot rely on this call since the request is sent asynchronously. Instead we should attempt to advance the high watermark in the callback when the AlterIsr response returns successfully. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #11055: HOTFIX: Init stream-stream left/outer join emit interval correctly
mjsax commented on pull request #11055: URL: https://github.com/apache/kafka/pull/11055#issuecomment-880286834 @kkonstantine I think we should try to get this into 3.0 release. Thoughts? Call for review @guozhangwang @spena -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11055: HOTFIX: Init stream-stream left/outer join emit interval correctly
mjsax commented on a change in pull request #11055: URL: https://github.com/apache/kafka/pull/11055#discussion_r670030416 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -103,7 +103,6 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte true )) { outerJoinWindowStore = outerJoinWindowName.map(context::getStateStore); -sharedTimeTracker.nextTimeToEmit = context.currentSystemTimeMs(); Review comment: In `init()` the cached system time is still zero, so we need to delay the initialization to a later point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #11055: HOTFIX: Init stream-stream left/outer join emit interval correctly
mjsax opened a new pull request #11055: URL: https://github.com/apache/kafka/pull/11055 Follow up to KAFKA-10847 (https://github.com/apache/kafka/pull/10917). The above fix intended to reduce the emit frequency to save the creation cost of RocksDB iterators. However, we incorrectly initialized the "timer" with timestamp zero, and thus, the timer was always in the past and we did try to emit left/outer join result too often and got a throughput of only 500 record/sec. This PR fixes the initialization of the emit interval timer. After re-running the benchmark, we determined that a default emit interval of 1000ms provides the best performance of 12K rec/sec, so this PR also changes the emit frequency to 1000ms. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13089) Revisit the usage of BufferSuppliers in Kraft
[ https://issues.apache.org/jira/browse/KAFKA-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380892#comment-17380892 ] Ismael Juma commented on KAFKA-13089: - The BufferSupplier implementations we have are not thread-safe and work best in a thread-confined way. Can each listener hold a thread confined buffer supplier? > Revisit the usage of BufferSuppliers in Kraft > - > > Key: KAFKA-13089 > URL: https://issues.apache.org/jira/browse/KAFKA-13089 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > > The latest KafkaRaftClient creates a new BufferSupplier every time it is > needed. A buffer supplier is needed when reading from the log and when > reading from a snapshot. > It would be good to investigate if there is a performance and memory usage > advantage of sharing the buffer supplier between those use cases and every > time the log or snapshot are read. > If BufferSupplier is share, it is very likely that the implementation will > have to be thread-safe because we need support multiple Listeners and each > Listener would be using a different thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669998610 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { +private final List replicas; +private final List isr; +private final boolean unclean; + +PartitionReassignmentRevert(PartitionRegistration registration) { +// Figure out the replica list and ISR that we will have after reverting the +// reassignment. In general, we want to take out any replica that the reassignment +// was adding, but keep the ones the reassignment was removing. (But see the +// special case below.) +Set adding = Replicas.toSet(registration.addingReplicas); +this.replicas = new ArrayList<>(registration.replicas.length); +this.isr = new ArrayList<>(registration.isr.length); +for (int i = 0; i < registration.isr.length; i++) { +int replica = registration.isr[i]; +if (!adding.contains(replica)) { +this.isr.add(replica); +} +} +for (int replica : registration.replicas) { +if (!adding.contains(replica)) { +this.replicas.add(replica); +} +} +if (isr.isEmpty()) { +// In the special case that all the replicas that are in the ISR are also +// contained in addingReplicas, we choose the first remaining replica and add +// it to the ISR. This is considered an unclean leader election. Therefore, +// calling code must check that unclean leader election is enabled before +// accepting the new ISR. +if (this.replicas.isEmpty()) { +// This should not be reachable, since it would require a partition +// starting with an empty replica set prior to the reassignment we are +// trying to revert. +throw new InvalidReplicaAssignmentException("Invalid replica " + +"assignment: addingReplicas contains all replicas."); +} +isr.add(replicas.get(0)); Review comment: Hmm, do we need to change isr here? It seems that BestLeader handles the unclean leader election with empty isr already. ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import
[GitHub] [kafka] ableegoldman merged pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
ableegoldman merged pull request #10877: URL: https://github.com/apache/kafka/pull/10877 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
ableegoldman commented on a change in pull request #10877: URL: https://github.com/apache/kafka/pull/10877#discussion_r670001461 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ## @@ -845,6 +845,64 @@ public void shouldBeAbleToQueryMapValuesState() throws Exception { for (final KeyValue batchEntry : batch1) { assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key)); } + +final KeyValueIterator range = myMapStore.range("hello", "kafka"); +while (range.hasNext()) { +System.out.println(range.next()); +} +} + +@Test +public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception { +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); +final StreamsBuilder builder = new StreamsBuilder(); +final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"}; +final Set> batch1 = new HashSet<>( +Arrays.asList( +new KeyValue<>(keys[0], "1"), +new KeyValue<>(keys[1], "1"), +new KeyValue<>(keys[2], "3"), +new KeyValue<>(keys[3], "5"), +new KeyValue<>(keys[4], "2")) +); + +final List> expectedPrefixScanResult = Arrays.asList( +new KeyValue<>(keys[3], 5L), +new KeyValue<>(keys[1], 1L) +); + +IntegrationTestUtils.produceKeyValuesSynchronously( +streamOne, +batch1, +TestUtils.producerConfig( +CLUSTER.bootstrapServers(), +StringSerializer.class, +StringSerializer.class, +new Properties()), +mockTime); + +final KTable t1 = builder.table(streamOne); +t1 +.mapValues( +(ValueMapper) Long::valueOf, +Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())) +.toStream() +.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); + +kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); +startKafkaStreamsAndWaitForRunningState(kafkaStreams); + +waitUntilAtLeastNumRecordProcessed(outputTopic, 5); + +final ReadOnlyKeyValueStore myMapStore = +IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, keyValueStore()); + +int index = 0; +final KeyValueIterator range = myMapStore.prefixScan("go", Serdes.String().serializer()); +while (range.hasNext()) { Review comment: I know this is just how the other tests are doing it, but it's not really an airtight way to validate the expected results...if nothing is returned then we never enter the `while` loop and the test passes, even if we did in fact expect there to be actual output. The important thing here was just to make sure it didn't throw an exception so it still does that, but it would be good to fix this up maybe in a followup PR ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java ## @@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int maxCacheSize) { .subMap(from, true, to, true).descendingKeySet().iterator(), treeMap)); } +@Override +public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { Review comment: 1) Could you just treat them as Bytes all the same, and just convert to/from an Integer before putting/getting them from the store? That way you're still just handling Bytes like you are in this test, it just goes through an extra layer of de/serialization. Should be able to more or less copy over the existing tests with just a bit of extra code. Can you try this, in a followup PR? 2) Yes, I was just suggesting to merge them as a possible way to make things easier and do less work, if it's going to be more then please do file a separate ticket for it. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ## @@ -383,6 +387,1002 @@ public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() { assertNull(store.get("key4")); } +@Test +public void testPrefixScanInMemoryStoreNoCachingNoLogging() { +final String storeName = "prefixScanStore"; +final StoreBuilder> storeBuilder = + Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String()) +.withCachingDisabled() +.withLoggingDisabled(); +topology +.addSource("source1", STRING_DESERIALIZER,
[GitHub] [kafka] cmccabe commented on a change in pull request #11054: KAFKA-13090: Improve kraft snapshot integration test
cmccabe commented on a change in pull request #11054: URL: https://github.com/apache/kafka/pull/11054#discussion_r670005690 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -489,6 +489,11 @@ public void initializeLeaderEpoch(int epoch) { return Optional.ofNullable(snapshots.get(snapshotId)); } +@Override +public Optional latestSnapshot() { +return latestSnapshotId().flatMap(this::readSnapshot); Review comment: I didn't realize they implemented flatMap on Java's Optional, interesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11054: KAFKA-13090: Improve kraft snapshot integration test
cmccabe commented on a change in pull request #11054: URL: https://github.com/apache/kafka/pull/11054#discussion_r670005192 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -84,7 +84,7 @@ public int lastContainedLogEpoch() { */ public long lastContainedLogTimestamp() { if (!lastContainedLogTimestamp.isPresent()) { -// nextBatch is expected to be empty +// nextBatch is expected to be equal to Optional.empty() so just replace it Review comment: Can we check this and throw an exception if it's not true? I don't think this method is called often so performance should not be an issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11054: KAFKA-13090: Improve kraft snapshot integration test
cmccabe commented on a change in pull request #11054: URL: https://github.com/apache/kafka/pull/11054#discussion_r670004898 ## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ## @@ -280,11 +280,19 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { Optional readSnapshot(OffsetAndEpoch snapshotId); /** - * Returns the latest snapshot id if one exists. + * Returns the latest readable snapshot if one exists. * - * @return an Optional snapshot id of the latest snashot if one exists, otherwise returns an - * empty Optional + * @return an Optional with the latest readable snapshot, if one exists, otherwise + * returns an empty Optional */ +Optional latestSnapshot(); Review comment: would "openLatestSnapshot" be a better name, given that the snapshot reader will need to be closed later? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13080) Fetch snapshot request are not directed to kraft in controller
[ https://issues.apache.org/jira/browse/KAFKA-13080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-13080. Resolution: Fixed > Fetch snapshot request are not directed to kraft in controller > -- > > Key: KAFKA-13080 > URL: https://issues.apache.org/jira/browse/KAFKA-13080 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > Kraft followers and observer are seeing the following error > {code:java} > [2021-07-13 18:15:47,289] ERROR [RaftManager nodeId=2] Unexpected error > UNKNOWN_SERVER_ERROR in FETCH_SNAPSHOT response: > InboundResponse(correlationId=29862, > data=FetchSnapshotResponseData(throttleTimeMs=0, errorCode=-1, topics=[]), > sourceId=3001) (org.apache.kafka.raft.KafkaRaftClient) {code} > This is because ControllerApis is not directing FetchSnapshost request to the > raft manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13078) Closing FileRawSnapshotWriter too early
[ https://issues.apache.org/jira/browse/KAFKA-13078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-13078. Resolution: Fixed > Closing FileRawSnapshotWriter too early > --- > > Key: KAFKA-13078 > URL: https://issues.apache.org/jira/browse/KAFKA-13078 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.0.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > We are getting the following error > {code:java} > [2021-07-13 17:23:42,174] ERROR [kafka-raft-io-thread]: Error due to > (kafka.raft.KafkaRaftManager$RaftIoThread) > java.io.UncheckedIOException: Error calculating snapshot size. temp path = > /mnt/kafka/kafka-metadata-logs/@metadata-0/0062-02-3249768281228588378.checkpoint.part, > snapshotId = OffsetAndEpoch(offset=62, epoch=2). > at > org.apache.kafka.snapshot.FileRawSnapshotWriter.sizeInBytes(FileRawSnapshotWriter.java:63) > at > org.apache.kafka.raft.KafkaRaftClient.maybeSendFetchOrFetchSnapshot(KafkaRaftClient.java:2044) > at > org.apache.kafka.raft.KafkaRaftClient.pollFollowerAsObserver(KafkaRaftClient.java:2032) > at > org.apache.kafka.raft.KafkaRaftClient.pollFollower(KafkaRaftClient.java:1995) > at > org.apache.kafka.raft.KafkaRaftClient.pollCurrentState(KafkaRaftClient.java:2104) > at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2217) > at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > Caused by: java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > at > org.apache.kafka.snapshot.FileRawSnapshotWriter.sizeInBytes(FileRawSnapshotWriter.java:60) > ... 7 more > {code} > This is because the {{FollowerState}} is closing the snapshot write passed > through the argument instead of the one being replaced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations
ableegoldman commented on pull request #10877: URL: https://github.com/apache/kafka/pull/10877#issuecomment-880255698 > I have added a test for IQ. Some reason, I wasn't able to run the test on my local as I was getting a build failure due to scala test classes. I would watch out for the status of the tests here. Apparently there was an actual issue with a scala class on trunk, if you pull/rebase you should be able to build locally again. Looks like the test you added did pass though, so nice 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #11054: KAFKA-13090: Improve kraft snapshot integration test
jsancio opened a new pull request #11054: URL: https://github.com/apache/kafka/pull/11054 Check and verify generated snapshots for the controllers and the brokers. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13090) Improve cluster snapshot integration test
Jose Armando Garcia Sancio created KAFKA-13090: -- Summary: Improve cluster snapshot integration test Key: KAFKA-13090 URL: https://issues.apache.org/jira/browse/KAFKA-13090 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 Extends the test in RaftClusterSnapshotTest to verify that both the controllers and brokers are generating snapshots. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13073) Simulation test fails due to inconsistency in MockLog's implementation
[ https://issues.apache.org/jira/browse/KAFKA-13073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-13073. Resolution: Fixed > Simulation test fails due to inconsistency in MockLog's implementation > -- > > Key: KAFKA-13073 > URL: https://issues.apache.org/jira/browse/KAFKA-13073 > Project: Kafka > Issue Type: Bug > Components: controller, replication >Affects Versions: 3.0.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: kip-500 > Fix For: 3.0.0 > > > We are getting the following error on trunk > {code:java} > RaftEventSimulationTest > canRecoverAfterAllNodesKilled STANDARD_OUT > timestamp = 2021-07-12T16:26:55.663, > RaftEventSimulationTest:canRecoverAfterAllNodesKilled = > java.lang.RuntimeException: > Uncaught exception during poll of node 1 > |---jqwik--- > tries = 25| # of calls to property > checks = 25 | # of not rejected calls > generation = RANDOMIZED | parameters are randomly generated > after-failure = PREVIOUS_SEED | use the previous seed > when-fixed-seed = ALLOW | fixing the random seed is allowed > edge-cases#mode = MIXIN | edge cases are mixed in > edge-cases#total = 108| # of all combined edge cases > edge-cases#tried = 4 | # of edge cases tried in current run > seed = 8079861963960994566| random seed to reproduce generated values >Sample > -- > arg0: 4002 > arg1: 2 > arg2: 4{code} > I think there are a couple of issues here: > # The {{ListenerContext}} for {{KafkaRaftClient}} uses the value returned by > {{ReplicatedLog::startOffset()}} to determined the log start and when to load > a snapshot while the {{MockLog}} implementation uses {{logStartOffset}} which > could be a different value. > # {{MockLog}} doesn't implement {{ReplicatedLog::maybeClean}} so the log > start offset is always 0. > # The snapshot id validation for {{MockLog}} and {{KafkaMetadataLog}}'s > {{createNewSnapshot}} throws an exception when the snapshot id is less than > the log start offset. > Solutions: > Fix the error quoted above we only need to fix bullet point 3. but I think we > should fix all of the issues enumerated in this Jira. > For 1. we should change the {{MockLog}} implementation so that it uses > {{startOffset}} both externally and internally. > For 2. I will file another issue to track this implementation. > For 3. I think this validation is too strict. I think it is safe to simply > ignore any attempt by the state machine to create an snapshot with an id less > that the log start offset. We should return a {{Optional.empty()}}when the > snapshot id is less than the log start offset. This tells the user that it > doesn't need to generate a snapshot for that offset. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-7632) Support Compression Level
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380866#comment-17380866 ] Konstantine Karantasis edited comment on KAFKA-7632 at 7/14/21, 9:50 PM: - This feature was not approved on time for 3.0. Pushing the target version to 3.1 was (Author: kkonstantine): This feature was not approved in time for 3.0. Pushing the target version to 3.1 > Support Compression Level > - > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Dongjin Lee >Priority: Blocker > Labels: needs-kip > Fix For: 3.1.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > Since it applies to the other compression codecs, we should add the same > functionalities to them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380874#comment-17380874 ] Konstantine Karantasis commented on KAFKA-9366: --- This feature was not approved on time for 3.0. Pushing the target version to 3.1 [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13198668] > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.0.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9366: -- Fix Version/s: (was: 3.0.0) 3.1.0 > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.1.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] niket-goel opened a new pull request #11053: [KAFKA-13015] Ducktape System Tests to for Metadata Snapshots
niket-goel opened a new pull request #11053: URL: https://github.com/apache/kafka/pull/11053 [WIP] This PR implements system tests in ducktape to test the ability of brokers and controllers to generate and consume snapshots and catch up with the metadata log. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380872#comment-17380872 ] Konstantine Karantasis commented on KAFKA-12495: This issue corresponds to a corner case that does not seem to appear in practice often. The current suggestion to allow for consecutive revocations carries some risk. I have another fix in mind that I'd like to explore. In the meantime I'm punting this issue to the next release. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of r
[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12495: --- Fix Version/s: 3.1.0 > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.1.0 > > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4] > W4 joins with assignment: [BT4, BT5] > W1 becomes leader >
[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380871#comment-17380871 ] Konstantine Karantasis commented on KAFKA-12283: As mentioned above, this failure corresponds to a corner case that does not seem to appear in practice often. The current suggestion to allow for consecutive revocations carries some risk. I have another fix in mind that I'd like to explore. In the meantime I'm punting this issue to the next release. > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0 > > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12283: --- Fix Version/s: (was: 3.0.0) 3.1.0 > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > Fix For: 3.1.0 > > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7632) Support Compression Level
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380866#comment-17380866 ] Konstantine Karantasis commented on KAFKA-7632: --- This feature was not approved in time for 3.0. Pushing the target version to 3.1 > Support Compression Level > - > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Dongjin Lee >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > Since it applies to the other compression codecs, we should add the same > functionalities to them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7632) Support Compression Level
[ https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-7632: -- Fix Version/s: (was: 3.0.0) 3.1.0 > Support Compression Level > - > > Key: KAFKA-7632 > URL: https://issues.apache.org/jira/browse/KAFKA-7632 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.1.0 > Environment: all >Reporter: Dave Waters >Assignee: Dongjin Lee >Priority: Blocker > Labels: needs-kip > Fix For: 3.1.0 > > > The compression level for ZSTD is currently set to use the default level (3), > which is a conservative setting that in some use cases eliminates the value > that ZSTD provides with improved compression. Each use case will vary, so > exposing the level as a broker configuration setting will allow the user to > adjust the level. > Since it applies to the other compression codecs, we should add the same > functionalities to them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13089) Revisit the usage of BufferSuppliers in Kraft
Jose Armando Garcia Sancio created KAFKA-13089: -- Summary: Revisit the usage of BufferSuppliers in Kraft Key: KAFKA-13089 URL: https://issues.apache.org/jira/browse/KAFKA-13089 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio The latest KafkaRaftClient creates a new BufferSupplier every time it is needed. A buffer supplier is needed when reading from the log and when reading from a snapshot. It would be good to investigate if there is a performance and memory usage advantage of sharing the buffer supplier between those use cases and every time the log or snapshot are read. If BufferSupplier is share, it is very likely that the implementation will have to be thread-safe because we need support multiple Listeners and each Listener would be using a different thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI
vvcephei commented on a change in pull request #10994: URL: https://github.com/apache/kafka/pull/10994#discussion_r669947367 ## File path: docs/streams/developer-guide/processor-api.html ## @@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a single Processor object by calling init() on it again after close(). -When records are forwarded via downstream processors they also get a timestamp assigned. There are two different default behaviors: - (1) If #forward() is called within #process() the output record inherits the input record timestamp. - (2) If #forward() is called within punctuate() the output record inherits the current punctuation timestamp (either current 'stream time' or system wall-clock time). - Note, that #forward() also allows to change the default behavior by passing a custom timestamp for the output record. -Specifically, ProcessorContext#schedule() accepts a user Punctuator callback interface, which triggers its punctuate() -API method periodically based on the PunctuationType. The PunctuationType determines what notion of time is used + +The Processor interface takes two sets of generic parameters: +KIn, VIn, KOut, VOut. These define the input and output types +that the processor implementation can handle. KIn and +VIn define the key and value types that will be passed +to process(). +Likewise, KOut and VOut +define the forwarded key and value types that ProcessorContext#forward() +will accept. If your processor does not forward any records at all (or if it only forwards +null keys or values), +a best practice is to set the output generic type argument to +Void. +If it needs to forward multiple types that don't share a common superclass, you will +have to set the output generic type argument to Object. + + +Both the Processor#process() +and the ProcessorContext#forward() +methods handle precords in the form of the Record+data class. This class gives you access to the key components of a Kafka record: +the key, value, timestamp and headers. When forwarding records, you can use the +constructor to create a new Record +from scratch, or you can use the convenience builder methods to replace one of the +Record's properties +and copy over the rest. For example, +inputRecord.withValue(newValue) +would copy the key, timestamp, and headers from +inputRecord while +setting the output record's value to newValue. +Note that this does not mutate inputRecord, +but instead creates a shallow copy. Beware that this is only a shallow copy, so if you +plan to mutate the key, value, or headers elsewhere in the program, you will want to +create a deep copy of those fields yourself. + + + In addition to handling incoming records via + Processor#process(), + you have the option to schedule periodic invocation (called "punctuation") + in your processor's init() Review comment: Hmm, that actually does sounds really useful. I never thought of it before. I'll file a ticket to document this use case. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669944074 ## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ## @@ -422,4 +351,75 @@ class RaftClusterTest { listenerName = listenerName ) + @Test + def testCreateClusterAndPerformReassignment(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(4). +setNumControllerNodes(3).build()).build() +try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.clientProperties()) + try { +// Create the topic. +val assignments = new util.HashMap[Integer, util.List[Integer]] +assignments.put(0, Arrays.asList(0, 1, 2)) +assignments.put(1, Arrays.asList(1, 2, 3)) +assignments.put(2, Arrays.asList(2, 3, 0)) +val createTopicResult = admin.createTopics(Collections.singletonList( + new NewTopic("foo", assignments))) +createTopicResult.all().get() +waitForTopicListing(admin, Seq("foo"), Seq()) + +// Start some reassignments. +assertEquals(Collections.emptyMap(), admin.listPartitionReassignments().reassignments().get()) +val reassignments = new util.HashMap[TopicPartition, Optional[NewPartitionReassignment]] +reassignments.put(new TopicPartition("foo", 0), + Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0 +reassignments.put(new TopicPartition("foo", 1), + Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2 Review comment: Good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl opened a new pull request #11052: Use ByteBuffers for LZ4 OutputStream
xvrl opened a new pull request #11052: URL: https://github.com/apache/kafka/pull/11052 Our current LZ4 OutputStream implementation allocates compression buffers internally and relies on intermediate byte arrays for input and output buffers. With this change we now use ByteBuffers internally, and as a result: * we write directly to the target ByteBuffer, avoiding an additional copy * we no longer allocate an output compression buffer, reducing allocations by half * we pave the way to make compression buffers reusable, similar to what we do for decompression -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669926069 ## File path: metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java ## @@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() { setIsNew(false).toString(), b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString()); } + +@Test +public void testMergePartitionChangeRecordWithReassignmentData() { +PartitionRegistration partition0 = new PartitionRegistration(new int[] {1, 2, 3}, +new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200); +PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord(). +setRemovingReplicas(Collections.singletonList(3)). +setAddingReplicas(Collections.singletonList(4)). +setReplicas(Arrays.asList(1, 2, 3, 4))); +assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, +new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), partition1); +PartitionRegistration partition2 = partition1.merge(new PartitionChangeRecord(). +setIsr(Arrays.asList(1, 2, 4)). +setRemovingReplicas(Collections.emptyList()). +setAddingReplicas(Collections.emptyList()). +setReplicas(Arrays.asList(1, 2, 4))); +assertEquals(new PartitionRegistration(new int[] {1, 2, 4}, +new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), partition2); +assertFalse(partition2.isReassigning()); +} + +@Test +public void testPartitionControlInfoIsrChangeCompletesReassignment() { +PartitionRegistration partition0 = new PartitionRegistration( +new int[]{1, 2, 3, 4}, new int[]{3}, new int[]{3}, new int[] {}, 1, 0, 0); Review comment: Good idea. But looking again, we can remove this test because the function has been shifted out of PartitionRegistration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669925103 ## File path: metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class PartitionChangeBuilderTest { +@Test +public void testChangeRecordIsNoOp() { +assertTrue(changeRecordIsNoOp(new PartitionChangeRecord())); +assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1))); +assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). +setIsr(Arrays.asList(1, 2, 3; +assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). +setRemovingReplicas(Arrays.asList(1; +assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). +setAddingReplicas(Arrays.asList(4; +} + +private final static PartitionRegistration FOO = new PartitionRegistration( +new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, +1, 100, 200); + +private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + +private static PartitionChangeBuilder createFooBuilder(boolean allowUnclean) { +return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> allowUnclean); +} + +private final static PartitionRegistration BAR = new PartitionRegistration( +new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] {4}, +1, 100, 200); + +private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw"); + +private static PartitionChangeBuilder createBarBuilder(boolean allowUnclean) { +return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> allowUnclean); +} + +private static void assertBestLeaderEquals(PartitionChangeBuilder builder, + int expectedNode, + boolean expectedUnclean) { +BestLeader bestLeader = builder.new BestLeader(); +assertEquals(expectedNode, bestLeader.node); +assertEquals(expectedUnclean, bestLeader.unclean); +} + +@Test +public void testBestLeader() { +assertBestLeaderEquals(createFooBuilder(false), 2, false); +assertBestLeaderEquals(createFooBuilder(true), 2, false); +assertBestLeaderEquals(createFooBuilder(false). +setTargetIsr(Arrays.asList(1, 3)), 1, false); +assertBestLeaderEquals(createFooBuilder(true). +setTargetIsr(Arrays.asList(1, 3)), 1, false); +assertBestLeaderEquals(createFooBuilder(false). +setTargetIsr(Arrays.asList(3)), NO_LEADER, false); +assertBestLeaderEquals(createFooBuilder(true). +setTargetIsr(Arrays.asList(3)), 2, true); +assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)), +4, false); +} + +@Test +public void testShouldTryElection() { +
[jira] [Commented] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
[ https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380838#comment-17380838 ] A. Sophie Blee-Goldman commented on KAFKA-13010: [~wcarl...@confluent.io] I'm guessing you wrote this test so you have the most context, can you reproduce this locally and take a minute or two to look through the logs and see if anything jumps out at you? > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation() > --- > > Key: KAFKA-13010 > URL: https://issues.apache.org/jira/browse/KAFKA-13010 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669911056 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java ## @@ -103,7 +103,8 @@ public TopicImage apply() { for (Entry entry : partitionChanges.entrySet()) { if (entry.getValue().leader == brokerId) { PartitionRegistration prevPartition = image.partitions().get(entry.getKey()); -if (prevPartition == null || prevPartition.leader != brokerId) { +if (prevPartition == null || +prevPartition.leaderEpoch != entry.getValue().leaderEpoch) { Review comment: Good point. Thinking about it more, we can just check for partition epoch here, since LE can't change without PE changing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11035: KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests
dajac commented on a change in pull request #11035: URL: https://github.com/apache/kafka/pull/11035#discussion_r669905082 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ## @@ -79,55 +90,82 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); -final Errors error = Errors.forCode(response.data().errorCode()); +final Errors error = response.topLevelError(); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { +Errors memberError = Errors.forCode(memberResponse.errorCode()); +String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); +memberError); Review comment: nit: We could revert this change as it does not bring much and re-align like it was before. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ## @@ -79,55 +90,82 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { +validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; -Map> completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final Map> completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); -final Errors error = Errors.forCode(response.data().errorCode()); +final Errors error = response.topLevelError(); if (error != Errors.NONE) { -handleError(groupId, error, failed, unmapped); +handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { +Errors memberError = Errors.forCode(memberResponse.errorCode()); +String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); +memberError); } completed.put(groupId, memberErrors); } -return new ApiResult<>(completed, failed, unmapped); + +if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { +return new ApiResult<>( +completed, +failed, +Collections.emptyList() +); +} else { +// retry the request, so don't send completed/failed results back +return new ApiResult<>( +Collections.emptyMap(), +Collections.emptyMap(), +new ArrayList<>(groupsToUnmap) +); +} } -private void handleError( +private void handleGroupError( CoordinatorKey groupId, -Errors error, Map failed, -List unmapped +Errors error, +Map failed, +Set groupsToUnmap, +Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: -log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, -error.exception()); +log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, er
[GitHub] [kafka] jsancio closed pull request #6777: MINOR: Run the Java and Scala documentation in Jenkins
jsancio closed pull request #6777: URL: https://github.com/apache/kafka/pull/6777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap
mjsax commented on pull request #10953: URL: https://github.com/apache/kafka/pull/10953#issuecomment-880159699 > but when users upgrade the Streams apps to 3.0 we guarantee retention time T+X I don't agree to this statement. It's a bug in the implementation (that this PR fixes) that we keep data for T+X, but the contract to guarantee that data is preserved (only) up to T does not change IMHO. Overall, the PR LGTM. If I read the code of this PR and the previous PRs correctly, it should put us back to the 2.8 behavior. We need to include the fix for JoinWindows though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name
jolshan commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r669899334 ## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ## @@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId, Set addInvalidTopics, Set addInternalTopics, Node newController, +Map topicIds, BiPredicate retainTopic) { Predicate shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic)); Map newMetadataByPartition = new HashMap<>(addPartitions.size()); +Map newTopicIds = new HashMap<>(topicIds.size()); + +// We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse +// we add if a new topic ID is added or remove if the request did not support topic IDs for this topic. +for (Map.Entry entry : this.topicIds.entrySet()) { +if (shouldRetainTopic.test(entry.getKey())) { +newTopicIds.put(entry.getKey(), entry.getValue()); +} +} + for (PartitionMetadata partition : addPartitions) { newMetadataByPartition.put(partition.topicPartition, partition); +Uuid id = topicIds.get(partition.topic()); +if (id != null) +newTopicIds.put(partition.topic(), id); +else +// Remove if the latest metadata does not have a topic ID Review comment: Yeah. That was my reasoning. I thought the upgrade/downgrade case would be rare and the guarantees harder to reason about there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669896829 ## File path: metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java ## @@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState toLeaderAndIsrPartitionState(TopicPartition tp setIsNew(isNew); } +/** + * Returns true if this partition is reassigning. + */ +public boolean isReassigning() { +return removingReplicas.length > 0 | addingReplicas.length > 0; +} + +/** + * Check if an ISR change completes this partition's reassignment. + * + * @param newIsrThe new ISR. + * @return True if the reassignment is complete. + */ +public boolean isrChangeCompletesReassignment(int[] newIsr) { Review comment: Yes, this is unused now. I'll remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests
dajac commented on a change in pull request #11022: URL: https://github.com/apache/kafka/pull/11022#discussion_r669797588 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java ## @@ -109,16 +109,19 @@ public String apiName() { Set groupIds, AbstractResponse abstractResponse ) { -DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; -Map completed = new HashMap<>(); -Map failed = new HashMap<>(); -List unmapped = new ArrayList<>(); +final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; +final Map completed = new HashMap<>(); +final Map failed = new HashMap<>(); +final Set groupsToUnmap = new HashSet<>(); +final Set groupsToRetry = new HashSet<>(); -for (DescribedGroup describedGroup : response.data().groups()) { +List describedGroups = response.data().groups(); + +for (DescribedGroup describedGroup : describedGroups) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); Errors error = Errors.forCode(describedGroup.errorCode()); if (error != Errors.NONE) { -handleError(groupIdKey, error, failed, unmapped); +handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry); Review comment: `groupsToRetry` to retry is not really necessary in this case. We don't even use it later. Could we remove it? ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -2688,8 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); -//Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); Review comment: Do we really need to remove this one? It seems to me that the changes in the PR does not change how the find coordinator response is handled, no? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java ## @@ -151,38 +154,45 @@ public String apiName() { completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( -String.format("GroupId %s is not a consumer group (%s).", -groupIdKey.idValue, protocolType))); +String.format("GroupId %s is not a consumer group (%s).", +groupIdKey.idValue, protocolType))); } } -return new ApiResult<>(completed, failed, unmapped); + +return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); Review comment: nit: There is an extra space before `new`. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java ## @@ -151,38 +154,45 @@ public String apiName() { completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( -String.format("GroupId %s is not a consumer group (%s).", -groupIdKey.idValue, protocolType))); +String.format("GroupId %s is not a consumer group (%s).", +groupIdKey.idValue, protocolType))); } } -return new ApiResult<>(completed, failed, unmapped); + +return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } private void handleError( CoordinatorKey groupId, Errors error, Map failed, -List unmapped +Set groupsToUnmap, +Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: -log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId, -error.exception()); +log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: -case COORDINATOR_NOT_AVAILABLE: +// If the coordinator is in the middle of loading, then we just need to retry +log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + +"is still in the proces
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669891898 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { +private final List replicas; +private final List isr; + +PartitionReassignmentRevert(PartitionRegistration registration) { +// Figure out the replica list and ISR that we will have after reverting the +// reassignment. In general, we want to take out any replica that the reassignment +// was adding, but keep the ones the reassignment was removing. (But see the +// special case below.) +Set adding = Replicas.toSet(registration.addingReplicas); +this.replicas = new ArrayList<>(registration.replicas.length); +this.isr = new ArrayList<>(registration.isr.length); +for (int i = 0; i < registration.isr.length; i++) { +int replica = registration.isr[i]; +if (!adding.contains(replica)) { +this.isr.add(replica); +} else if (i == registration.isr.length - 1 && isr.isEmpty()) { +// This is a special case where taking out all the "adding" replicas is +// not possible. The reason it is not possible is that doing so would +// create an empty ISR, which is not allowed. +// +// In this case, we leave in one of the adding replicas permanently. Review comment: I guess we can just not allow the reassignment to be cancelled in this situation. It should still be possible to undo the reassignment by creating a new reassignment, so it doesn't totally trap the user, I suppose... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode
cmccabe commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669215608 ## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; + +/** + * PartitionMutator handles changing partition registrations. + */ +public class PartitionChangeBuilder { +public static boolean changeRecordIsNoOp(PartitionChangeRecord record) { +if (record.isr() != null) return false; +if (record.leader() != NO_LEADER_CHANGE) return false; +if (record.replicas() != null) return false; +if (record.removingReplicas() != null) return false; +if (record.addingReplicas() != null) return false; +return true; +} + +private final PartitionRegistration partition; +private final Uuid topicId; +private final int partitionId; +private final Function isAcceptableLeader; +private final Supplier uncleanElectionOk; +private List targetIsr; +private List targetReplicas; +private List targetRemoving; +private List targetAdding; +private boolean alwaysElectPreferredIfPossible; + +public PartitionChangeBuilder(PartitionRegistration partition, + Uuid topicId, + int partitionId, + Function isAcceptableLeader, + Supplier uncleanElectionOk) { +this.partition = partition; +this.topicId = topicId; +this.partitionId = partitionId; +this.isAcceptableLeader = isAcceptableLeader; +this.uncleanElectionOk = uncleanElectionOk; +this.targetIsr = Replicas.toList(partition.isr); +this.targetReplicas = Replicas.toList(partition.replicas); +this.targetRemoving = Replicas.toList(partition.removingReplicas); +this.targetAdding = Replicas.toList(partition.addingReplicas); +this.alwaysElectPreferredIfPossible = false; +} + +public PartitionChangeBuilder setTargetIsr(List targetIsr) { +this.targetIsr = targetIsr; +return this; +} + +public PartitionChangeBuilder setTargetReplicas(List targetReplicas) { +this.targetReplicas = targetReplicas; +return this; +} + +public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) { Review comment: I also added a unit test for `electLeaders` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name
hachikuji commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r669881047 ## File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java ## @@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId, Set addInvalidTopics, Set addInternalTopics, Node newController, +Map topicIds, BiPredicate retainTopic) { Predicate shouldRetainTopic = topic -> retainTopic.test(topic, internalTopics.contains(topic)); Map newMetadataByPartition = new HashMap<>(addPartitions.size()); +Map newTopicIds = new HashMap<>(topicIds.size()); + +// We want the most recent topic ID. We add the old one here for retained topics and then update with newest information in the MetadataResponse +// we add if a new topic ID is added or remove if the request did not support topic IDs for this topic. +for (Map.Entry entry : this.topicIds.entrySet()) { +if (shouldRetainTopic.test(entry.getKey())) { +newTopicIds.put(entry.getKey(), entry.getValue()); +} +} + for (PartitionMetadata partition : addPartitions) { newMetadataByPartition.put(partition.topicPartition, partition); +Uuid id = topicIds.get(partition.topic()); +if (id != null) +newTopicIds.put(partition.topic(), id); +else +// Remove if the latest metadata does not have a topic ID Review comment: We can leave it as is I guess since I can't think of a strong case to remove it. It is a rare situation that we would hit this case and the consequence of losing the topic ID is probably not too bad. Worst case, we might miss a recreation which occurred while the cluster was rolling to upgrade or downgrade. On the other hand, it could lead to other kinds of problems if we allow updates to the epoch information tied to a topic ID without being able to validate that the topic ID is correct, so maybe this logic is for the best. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-880127165 Thanks @cadonna for the feedback. I've replied/addressed all of your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r669865915 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -51,12 +57,12 @@ public boolean assign(final Map clients, final SortedSet statefulTasks = new TreeSet<>(statefulTaskIds); final TreeMap clientStates = new TreeMap<>(clients); -assignActiveStatefulTasks(clientStates, statefulTasks); +final Map statefulTasksClientMappings = assignActiveStatefulTasks(clientStates, statefulTasks); assignStandbyReplicaTasks( clientStates, -statefulTasks, -configs.numStandbyReplicas +statefulTasksClientMappings, +configs ); Review comment: Sure! done. Personal preference. Having all the strategies of standby task assignment implementations in a single class makes unit testing a bit easier. But I do agree that removing one extra class is indeed good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13033) coordinator not available error should cause add into unmap list to do a new lookup
[ https://issues.apache.org/jira/browse/KAFKA-13033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13033: Priority: Blocker (was: Major) > coordinator not available error should cause add into unmap list to do a new > lookup > --- > > Key: KAFKA-13033 > URL: https://issues.apache.org/jira/browse/KAFKA-13033 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > > In KIP-699, we add some handler to handle different types of operation. In > the `handleError`, we didn't make the `COORDINATOR_NOT_AVAILABLE` as > unmapped, to do a re-lookup. In > [DescribeTransactionsHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java#L172-L186], > there's already explained by [~hachikuji] why `COORDINATOR_NOT_AVAILABLE` > and `NOT_COORDINATOR` should be listed in unmapped, and > `COORDINATOR_LOAD_IN_PROGRESS` should not. > > {code:java} > case COORDINATOR_LOAD_IN_PROGRESS: > // If the coordinator is in the middle of loading, then we just need to > retry > log.debug("DescribeTransactions request for transactionalId `{}` failed > because the " + > "coordinator is still in the process of loading state. Will > retry", > transactionalIdKey.idValue); > break; > case NOT_COORDINATOR: > case COORDINATOR_NOT_AVAILABLE: > // If the coordinator is unavailable or there was a coordinator change, > then we unmap > // the key so that we retry the `FindCoordinator` request > unmapped.add(transactionalIdKey); > log.debug("DescribeTransactions request for transactionalId `{}` returned > error {}. Will attempt " + > "to find the coordinator again and retry", > transactionalIdKey.idValue, error); > break; > {code} > We should be consistent with it. Fix it, add logs and comments, and also > update the tests. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r669853519 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; + +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +abstract class StandbyTaskAssignor { +protected final AssignmentConfigs configs; + +StandbyTaskAssignor(final AssignmentConfigs configs) { +this.configs = configs; +} + +abstract void assignStandbyTasks(final Map statefulTasksWithClients, + final TreeMap clientStates); Review comment: I didn't give it much thought to be honest. `TreeMap` for the `clientStates` was already used in the `HighAvailabilityTaskAssignor` and went with the same signature here. I think it makes sense to change the contract to be a `SortedMap`. Will do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r669851397 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -51,12 +57,12 @@ public boolean assign(final Map clients, final SortedSet statefulTasks = new TreeSet<>(statefulTaskIds); final TreeMap clientStates = new TreeMap<>(clients); -assignActiveStatefulTasks(clientStates, statefulTasks); +final Map statefulTasksClientMappings = assignActiveStatefulTasks(clientStates, statefulTasks); Review comment: I tried to avoid unnecessary iterations. With that we would have to do separate iteration in the `ClientTagAwareStandbyTaskAssignor`, which felt redundant, since `assignActiveStatefulTasks` can return necessary mapping since it has to iterate over client states either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r669849371 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor extends StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +ClientTagAwareStandbyTaskAssignor(final AssignmentConfigs configs) { +super(configs); +} + +@Override +public void assignStandbyTasks(final Map statefulTasksWithClients, + final TreeMap clientStates) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); + +final StandbyTaskDistributor standbyTaskDistributor = new StandbyTaskDistributor( +numStandbyReplicas, +clientStates, +rackAwareAssignmentTags, +statefulTasksWithClients +); + + statefulTasksWithClients.forEach(standbyTaskDistributor::assignStandbyTasksForActiveTask); +} + +@Override +public boolean isValidTaskMovement(final TaskMovementAttempt taskMovementAttempt) { +final Map sourceClientTags = taskMovementAttempt.sourceClient().clientTags(); +final Map destinationClientTags = taskMovementAttempt.destinationClient().clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static final class StandbyTaskDistributor { Review comment: Thanks for the feedback Bruno. I reasoned that, since internal states like `clientsPerTagValue`, `standbyTaskClientsByTaskLoad`, etc., have to be allocated per invocation of `assignStandbyTasks` method, it felt easier and more readable to create one single internal object rather than invalidating local caches in `ClientTagAwareStandbyTaskAssignor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12308) ConfigDef.parseType deadlock
[ https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12308: --- Fix Version/s: 3.0.0 > ConfigDef.parseType deadlock > > > Key: KAFKA-12308 > URL: https://issues.apache.org/jira/browse/KAFKA-12308 > Project: Kafka > Issue Type: Bug > Components: config, KafkaConnect >Affects Versions: 2.5.0 > Environment: kafka 2.5.0 > centos7 > java version "1.8.0_231" >Reporter: cosmozhu >Priority: Major > Fix For: 3.0.0 > > Attachments: deadlock.log > > > hi, > the problem was found, when I restarted *ConnectDistributed* > I restart ConnectDistributed in the single node for the test, with not delete > connectors. > sometimes the process stopped when creating connectors. > I add some logger and found it had a deadlock in `ConfigDef.parseType`.My > connectors always have the same transforms. I guess when connector startup > (in startAndStopExecutor which default 8 threads) and load the same class > file it has something wrong. > I attached the jstack log file. > thanks for any help. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12308) ConfigDef.parseType deadlock
[ https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-12308. Resolution: Fixed > ConfigDef.parseType deadlock > > > Key: KAFKA-12308 > URL: https://issues.apache.org/jira/browse/KAFKA-12308 > Project: Kafka > Issue Type: Bug > Components: config, KafkaConnect >Affects Versions: 2.5.0 > Environment: kafka 2.5.0 > centos7 > java version "1.8.0_231" >Reporter: cosmozhu >Priority: Major > Attachments: deadlock.log > > > hi, > the problem was found, when I restarted *ConnectDistributed* > I restart ConnectDistributed in the single node for the test, with not delete > connectors. > sometimes the process stopped when creating connectors. > I add some logger and found it had a deadlock in `ConfigDef.parseType`.My > connectors always have the same transforms. I guess when connector startup > (in startAndStopExecutor which default 8 threads) and load the same class > file it has something wrong. > I attached the jstack log file. > thanks for any help. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7421) Deadlock in Kafka Connect during class loading
[ https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380773#comment-17380773 ] Konstantine Karantasis commented on KAFKA-7421: --- The fix has now been merged. Let's keep track and report any new issues if they appear. Some context exists on https://issues.apache.org/jira/browse/KAFKA-12308 as well which reported a similar issue. > Deadlock in Kafka Connect during class loading > -- > > Key: KAFKA-7421 > URL: https://issues.apache.org/jira/browse/KAFKA-7421 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Maciej Bryński >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 3.0.0 > > > I'm getting this deadlock on half of Kafka Connect runs when having two > different types connectors (in this configuration it's debezium and hdfs). > Thread 1: > {code} > "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > Thread 2: > {code} > "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >blocks pool-22-thread-2@4748 >waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:406) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358) > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > - locked <0x1424> (a java.lang.Object) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) > - locked <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distribu
[jira] [Resolved] (KAFKA-7421) Deadlock in Kafka Connect during class loading
[ https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-7421. --- Resolution: Fixed > Deadlock in Kafka Connect during class loading > -- > > Key: KAFKA-7421 > URL: https://issues.apache.org/jira/browse/KAFKA-7421 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Maciej Bryński >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 3.0.0 > > > I'm getting this deadlock on half of Kafka Connect runs when having two > different types connectors (in this configuration it's debezium and hdfs). > Thread 1: > {code} > "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > Thread 2: > {code} > "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >blocks pool-22-thread-2@4748 >waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:406) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358) > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > - locked <0x1424> (a java.lang.Object) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) > - locked <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >
[jira] [Updated] (KAFKA-7421) Deadlock in Kafka Connect during class loading
[ https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-7421: -- Fix Version/s: 3.0.0 > Deadlock in Kafka Connect during class loading > -- > > Key: KAFKA-7421 > URL: https://issues.apache.org/jira/browse/KAFKA-7421 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Maciej Bryński >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 3.0.0 > > > I'm getting this deadlock on half of Kafka Connect runs when having two > different types connectors (in this configuration it's debezium and hdfs). > Thread 1: > {code} > "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > Thread 2: > {code} > "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >blocks pool-22-thread-2@4748 >waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:406) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358) > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > - locked <0x1424> (a java.lang.Object) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) > - locked <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at
[jira] [Commented] (KAFKA-12308) ConfigDef.parseType deadlock
[ https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380772#comment-17380772 ] Konstantine Karantasis commented on KAFKA-12308: Adding the comment that I added in the PR here as well: The idea that the {{DelegatingClassLoader}} did not have to be parallel capable originated to the fact that it doesn't load classes directly. It delegates loading either to the appropriate PluginClassLoader directly via composition, or to the parent by calling {{super.loadClass}}. The latter is the key point of why we need to make the {{DelegatingClassLoader}} also parallel capable even though it doesn't load a class. Because inheritance is used (via a call to {{super.loadClass}}) and not composition (via a hypothetical call to {{parent.loadClass}}, which is not possible because {{parent}} is a private member of the base abstract class {{ClassLoader}}) when {{getClassLoadingLock}} is called in {{super.loadClass}} it checks that actually the derived class (here an instance of {{DelegatingClassLoader}}) is not parallel capable and therefore ends up not applying fine-grain locking during classloading even though the parent clasloader is used actually load the classes. Based on the above, the {{DelegatingClassLoader}} needs to be parallel capable too in order for the parent loader to load classes in parallel. I've tested both classloader types being parallel capable in a variety of scenarios with multiple connectors, SMTs and converters and a deadlock did not reproduce. Of course reproducing the issue is difficult without the specifics of the jar layout to begin with. The possibility of a deadlock is still not zero, but also probably not exacerbated compared to the current code. The plugin that depends on other plugins to be loaded while it's loading its classes is the connector type plugin only and there are no inter-connector dependencies (a connector requiring another connector's classes to be loaded while loading its own). With that in mind, a deadlock should be even less possible now. In the future we could consider introducing deadlock recovery methods to get out of this type of situation if necessary. > ConfigDef.parseType deadlock > > > Key: KAFKA-12308 > URL: https://issues.apache.org/jira/browse/KAFKA-12308 > Project: Kafka > Issue Type: Bug > Components: config, KafkaConnect >Affects Versions: 2.5.0 > Environment: kafka 2.5.0 > centos7 > java version "1.8.0_231" >Reporter: cosmozhu >Priority: Major > Attachments: deadlock.log > > > hi, > the problem was found, when I restarted *ConnectDistributed* > I restart ConnectDistributed in the single node for the test, with not delete > connectors. > sometimes the process stopped when creating connectors. > I add some logger and found it had a deadlock in `ConfigDef.parseType`.My > connectors always have the same transforms. I guess when connector startup > (in startAndStopExecutor which default 8 threads) and load the same class > file it has something wrong. > I attached the jstack log file. > thanks for any help. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments
cmccabe merged pull request #11048: URL: https://github.com/apache/kafka/pull/11048 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-7421) Deadlock in Kafka Connect during class loading
[ https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-7421: -- Summary: Deadlock in Kafka Connect during class loading (was: Deadlock in Kafka Connect) > Deadlock in Kafka Connect during class loading > -- > > Key: KAFKA-7421 > URL: https://issues.apache.org/jira/browse/KAFKA-7421 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Maciej Bryński >Assignee: Konstantine Karantasis >Priority: Major > > I'm getting this deadlock on half of Kafka Connect runs when having two > different types connectors (in this configuration it's debezium and hdfs). > Thread 1: > {code} > "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > Thread 2: > {code} > "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry > java.lang.Thread.State: BLOCKED >blocks pool-22-thread-2@4748 >waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:406) > at > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358) > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > - locked <0x1424> (a java.lang.Object) > at > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) > - locked <0x1423> (a > org.apache.kafka.connect.runtime.isolation.PluginClassLoader) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57) > at java.lang.Class.forName0(Class.java:-1) > at java.lang.Class.forName(Class.java:348) > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200) > at > org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194) > at > org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(Threa
[GitHub] [kafka] kkonstantine merged pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine merged pull request #8259: URL: https://github.com/apache/kafka/pull/8259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on pull request #8259: URL: https://github.com/apache/kafka/pull/8259#issuecomment-880082421 The test failures were not relevant. Merging to `trunk` and `3.0` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name
jolshan commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r669815073 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -391,10 +393,15 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) int newEpoch = partitionMetadata.leaderEpoch.get(); // If the received leader epoch is at least the same as the previous one, update the metadata Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) { +if (currentEpoch == null || newEpoch >= currentEpoch) { log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); +} else if (changedTopicId) { +log.debug("Topic ID changed, so this topic must have been recreated. " + Review comment: Yeah, I was thinking that too. I just have to be careful when comparing to remember the zero uuid case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name
hachikuji commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r669812349 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -377,6 +397,12 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); +// If the topic ID changed, updated the metadata +} else if (changedTopicId) { +log.debug("Topic ID changed, so this topic must have been recreated. " + +"Removing last seen epoch {} for the old partition {} and adding epoch {} from new metadata", currentEpoch, tp, newEpoch); +lastSeenLeaderEpochs.put(tp, newEpoch); +return Optional.of(partitionMetadata); Review comment: Yes, I was just pointing out that there is still a gap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name
hachikuji commented on a change in pull request #10952: URL: https://github.com/apache/kafka/pull/10952#discussion_r669810537 ## File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java ## @@ -391,10 +393,15 @@ else if (metadata.error() == Errors.TOPIC_AUTHORIZATION_FAILED) int newEpoch = partitionMetadata.leaderEpoch.get(); // If the received leader epoch is at least the same as the previous one, update the metadata Integer currentEpoch = lastSeenLeaderEpochs.get(tp); -if (currentEpoch == null || newEpoch >= currentEpoch || changedTopicId) { +if (currentEpoch == null || newEpoch >= currentEpoch) { log.debug("Updating last seen epoch for partition {} from {} to epoch {} from new metadata", tp, currentEpoch, newEpoch); lastSeenLeaderEpochs.put(tp, newEpoch); return Optional.of(partitionMetadata); +} else if (changedTopicId) { +log.debug("Topic ID changed, so this topic must have been recreated. " + Review comment: If you pass the new one, then you can probably get rid of `changedTopicId` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r669806313 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ## @@ -82,11 +93,19 @@ Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> MANIFEST_PREFIX + serviceLoaderPlugin.getName()) .collect(Collectors.toSet()); +// Although this classloader does not load classes directly but rather delegates loading to a +// PluginClassLoader or its parent through its base class, because of the use of inheritance in +// in the latter case, this classloader needs to also be declared as parallel capable to use +// fine-grain locking when loading classes. +static { Review comment: Keeping the static block here, because it's a block and that's what we have in `PluginClassLoader`. Our style is not too strict with respect of this ordering. The class overwrites the `loadClass` method. But it's delegating the loading to different classloaders and the locking is embedded in these classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments
cmccabe commented on pull request #11048: URL: https://github.com/apache/kafka/pull/11048#issuecomment-880053828 I updated this and fixed a case in createPartitions where the same issue existed (added a test for that as well) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments
cmccabe commented on a change in pull request #11048: URL: https://github.com/apache/kafka/pull/11048#discussion_r669790479 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -377,8 +377,20 @@ private ApiError createTopic(CreatableTopic topic, validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor); replicationFactor = OptionalInt.of(assignment.brokerIds().size()); int[] replicas = Replicas.toArray(assignment.brokerIds()); +List isr = new ArrayList<>(); Review comment: Yeah, we can use a stream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #10280: KAFKA-12554: Refactor Log layer
kowshik commented on pull request #10280: URL: https://github.com/apache/kafka/pull/10280#issuecomment-880044495 @ijuma Discussed with @satishd. We are not planning to include this in 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13088) Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest
[ https://issues.apache.org/jira/browse/KAFKA-13088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-13088: -- Component/s: streams > Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest > > > Key: KAFKA-13088 > URL: https://issues.apache.org/jira/browse/KAFKA-13088 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Chun-Hao Tang >Assignee: Chun-Hao Tang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380730#comment-17380730 ] Ewen Cheslack-Postava commented on KAFKA-3539: -- [~moses.nakamura] I spent a bunch of time on the clients in the past, but I've barely been involved for the past few years. So not really even sure of the current state of implementation and tests (e.g. I wasn't working on the clients when EoS was implemented). What tests fail due to minor changes? If they are unit tests, that should be unexpected unless you are changing public API, which would require a KIP anyway. You might also just be seeing flakiness in integration tests, which unfortunately is expected. Providing a list of the tests that break and whether it's compilation or runtime issues would probably help, but someone more active can probably provide better guidance. > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments
cmccabe commented on pull request #11048: URL: https://github.com/apache/kafka/pull/11048#issuecomment-880035001 > I'm assuming we don't need a similar check for the automatic assignment path since the replica placer will only select unfenced replicas. Right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 opened a new pull request #11051: KAFKA-13088: Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest
tang7526 opened a new pull request #11051: URL: https://github.com/apache/kafka/pull/11051 https://issues.apache.org/jira/browse/KAFKA-13088 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13088) Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest
Chun-Hao Tang created KAFKA-13088: - Summary: Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest Key: KAFKA-13088 URL: https://issues.apache.org/jira/browse/KAFKA-13088 Project: Kafka Issue Type: Sub-task Reporter: Chun-Hao Tang Assignee: Chun-Hao Tang -- This message was sent by Atlassian Jira (v8.3.4#803005)