[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-14 Thread GitBox


guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-880417140


   Also cc @cmccabe @hachikuji .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-14 Thread GitBox


guozhangwang commented on pull request #11057:
URL: https://github.com/apache/kafka/pull/11057#issuecomment-880416855


   @vvcephei @ableegoldman @showuon LMK what do you think about this approach. 
I have not do the due diligence of test coverage yet, will do that if people 
are +1 on this direction. If we feel this is a general issue for normal 
consumers as well, then this approach would be better than just fixing it at 
the Streams layer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #11057: KAFKA-13008: Try to refresh end offset when partitionLag returns empty

2021-07-14 Thread GitBox


guozhangwang opened a new pull request #11057:
URL: https://github.com/apache/kafka/pull/11057


   This is an idea I had for attacking this on the consumer client level:
   
   1. When listOffset result is retrieved inside Fetcher, check if the 
partitions are part of the subscriptions of the consumer; if yes update the 
corresponding LSO or HW based on the isolation level.
   2. When partitionLag cannot return result since the log end offset (LSO/HW) 
is not known, send an async list offset which would be completed by other calls 
polling (also the hb thread may complete it as well), and hope the next 
partitionLag would get the result.
   
   Then on the streams side, the first partitionLag would still return empty, 
but soon enough the subsequent partitionLag should return data and we would not 
wait for the fetch response to update fetched state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670134315



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+private final boolean unclean;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+}
+}
+for (int replica : registration.replicas) {
+if (!adding.contains(replica)) {
+this.replicas.add(replica);
+}
+}
+if (isr.isEmpty()) {
+// In the special case that all the replicas that are in the ISR 
are also
+// contained in addingReplicas, we choose the first remaining 
replica and add
+// it to the ISR. This is considered an unclean leader election. 
Therefore,
+// calling code must check that unclean leader election is enabled 
before
+// accepting the new ISR.
+if (this.replicas.isEmpty()) {
+// This should not be reachable, since it would require a 
partition
+// starting with an empty replica set prior to the 
reassignment we are
+// trying to revert.
+throw new InvalidReplicaAssignmentException("Invalid replica " 
+
+"assignment: addingReplicas contains all replicas.");
+}
+isr.add(replicas.get(0));

Review comment:
   It seems simpler to do it here because then we can reject the unclean 
case before creating the `PartitionChangeBuilder`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r670132470



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionChangeBuilder handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {
+this.alwaysElectPreferredIfPossible = alwaysElectPreferredIfPossible;
+return this;
+}
+
+public PartitionChangeBuilder setTargetRemoving(List 
targetRemoving) {
+this.targetRemoving = targetRemoving;
+return this;
+}
+
+public PartitionChangeBuilder setTargetAdding(List targetAdding) {
+this.targetAdding = targetAdding;
+return this;
+}
+
+boolean shouldTryElection() {
+// If the new isr doesn't have the current leader, we need to try to 
elect a new
+// one. Note: this also handles the case where the current leader is 
NO_LEADER,
+// since that value cannot appear in targetIsr.
+if (!targetIsr.contains(partition.leader)) return true;
+
+// Check if we want to try to get away from a non-preferred leader.
+if (alwaysElectPreferredIfPossible && !partition.hasPreferredLeader()) 
return true;
+
+return false;
+}
+
+class BestLeader {
+final int node;
+

[GitHub] [kafka] guozhangwang commented on a change in pull request #10941: KAFKA-10847: Remove internal config for enabling the fix

2021-07-14 Thread GitBox


guozhangwang commented on a change in pull request #10941:
URL: https://github.com/apache/kafka/pull/10941#discussion_r670119642



##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,21 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+We dropped the default 24 hours grace period for windowed operations 
such as Window or Session aggregates, or stream-stream joins.
+This period determines how long after a window ends any late arrived 
records will still be processed.
+Records coming in after the grace period has elapsed will be dropped 
from those windows.
+With a long grace period, though Kafka Streams can handle out-of-order 
data up to that amount of time, it will also incur a high and confusing latency 
for users,
+e.g. suppression operators with the default won't emit results up for 
24 hours, while lso in practice out-of-order data usually has a much smaller 
time-skewness.
+Instead of abstracting this config from users with a long default 
value, we introduced new constructs such as 
TimeWindows#ofSizeWithNoGrace to let callers always set it upon 
constructing the windows;

Review comment:
   `TimeDiffernce` is only in `JoinWindows`.

##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,21 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+We dropped the default 24 hours grace period for windowed operations 
such as Window or Session aggregates, or stream-stream joins.

Review comment:
   Since we are piggy-backing the fix on KIP-663 now, I want to incorporate 
the change along with this PR.

##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,21 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+We dropped the default 24 hours grace period for windowed operations 
such as Window or Session aggregates, or stream-stream joins.
+This period determines how long after a window ends any late arrived 
records will still be processed.
+Records coming in after the grace period has elapsed will be dropped 
from those windows.
+With a long grace period, though Kafka Streams can handle out-of-order 
data up to that amount of time, it will also incur a high and confusing latency 
for users,
+e.g. suppression operators with the default won't emit results up for 
24 hours, while lso in practice out-of-order data usually has a much smaller 
time-skewness.
+Instead of abstracting this config from users with a long default 
value, we introduced new constructs such as 
TimeWindows#ofSizeWithNoGrace to let callers always set it upon 
constructing the windows;
+the other setters such as TimeWindows#grace are 
deprecated and will be removed in the future.
+Also when the new construct API are used for left/outer stream-stream 
joins, Kafka Streams would fix emitting spurious join results which may have an 
impact on the throughput.

Review comment:
   Ack!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-14 Thread GitBox


guozhangwang commented on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-880375143


   @cadonna and everyone: I've addressed the comments and add `JoinWindows`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-14 Thread GitBox


guozhangwang commented on a change in pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#discussion_r670103326



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##
@@ -146,7 +146,7 @@ public static TimeWindows of(final Duration size) throws 
IllegalArgumentExceptio
 final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
 final long sizeMs = validateMillisecondDuration(size, msgPrefix);
 
-return new TimeWindows(sizeMs, sizeMs, 
DEPRECATED_OLD_24_HR_GRACE_PERIOD);
+return new TimeWindows(sizeMs, sizeMs, 
Math.max(DEPRECATED_OLD_24_HR_GRACE_PERIOD - sizeMs, 0));

Review comment:
   Updated on all corresponding classes' constructors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11019: KAFKA-13059: refactor DeleteConsumerGroupOffsetsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11019:
URL: https://github.com/apache/kafka/pull/11019#discussion_r670111886



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java
##
@@ -67,48 +69,88 @@ public void testBuildRequest() {
 @Test
 public void testSuccessfulHandleResponse() {
 Map responseData = 
Collections.singletonMap(t0p0, Errors.NONE);
-assertCompleted(handleWithError(Errors.NONE), responseData);
+assertCompleted(handleWithGroupError(Errors.NONE), responseData);
 }
 
 @Test
 public void testUnmappedHandleResponse() {
-assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
+assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR));
+assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE));
 }
 
 @Test
 public void testRetriableHandleResponse() {
-assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
-assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
+
assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
 }
 
 @Test
-public void testFailedHandleResponse() {
-assertFailed(GroupAuthorizationException.class, 
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
-assertFailed(GroupIdNotFoundException.class, 
handleWithError(Errors.GROUP_ID_NOT_FOUND));
-assertFailed(InvalidGroupIdException.class, 
handleWithError(Errors.INVALID_GROUP_ID));
+public void testFailedHandleResponseWithGroupError() {
+assertGroupFailed(GroupAuthorizationException.class, 
handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED));
+assertGroupFailed(GroupIdNotFoundException.class, 
handleWithGroupError(Errors.GROUP_ID_NOT_FOUND));
+assertGroupFailed(InvalidGroupIdException.class, 
handleWithGroupError(Errors.INVALID_GROUP_ID));
+assertGroupFailed(GroupNotEmptyException.class, 
handleWithGroupError(Errors.NON_EMPTY_GROUP));
 }
 
-private OffsetDeleteResponse buildResponse(Errors error) {
+@Test
+public void testFailedHandleResponseWithPartitionError() {
+assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC),
+handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC));
+assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.TOPIC_AUTHORIZATION_FAILED),
+handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED));
+assertPartitionFailed(Collections.singletonMap(t0p0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION),
+handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+}
+
+private OffsetDeleteResponse buildGroupErrorResponse(Errors error) {
+OffsetDeleteResponse response = new OffsetDeleteResponse(
+new OffsetDeleteResponseData()
+.setErrorCode(error.code()));
+if (error == Errors.NONE) {
+response.data()
+.setThrottleTimeMs(0)
+.setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
+new OffsetDeleteResponseTopic()
+.setName(t0p0.topic())
+.setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
+new OffsetDeleteResponsePartition()
+.setPartitionIndex(t0p0.partition())
+.setErrorCode(error.code())
+).iterator()))
+).iterator()));
+}
+return response;
+}
+
+private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) {
 OffsetDeleteResponse response = new OffsetDeleteResponse(
-new OffsetDeleteResponseData()
-.setThrottleTimeMs(0)
-.setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
-new OffsetDeleteResponseTopic()
-.setName("t0")
-.setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
-new OffsetDeleteResponsePartition()
-.setPartitionIndex(0)
-.setErrorCode(error.code())
- ).iterator()))
-  ).iterator(;
+new OffsetDeleteResponseData()
+.setThrottleTimeMs(0)
+.setTopics(new 
OffsetDeleteResponseTopicCollection(singletonList(
+new OffsetDeleteResponseTopic()
+.setName(t0p0.topic())
+.setPartitions(new 
OffsetDeleteResponsePartitionCollection(singletonList(
+new OffsetDeleteResponsePartition()
+   

[GitHub] [kafka] jolshan opened a new pull request #11056: KAFKA-13092: Perf regression in LISR requests

2021-07-14 Thread GitBox


jolshan opened a new pull request #11056:
URL: https://github.com/apache/kafka/pull/11056


   After noticing increased LISR times, we discovered a lot of time was spent 
synchronously flushing the partition metadata file. This PR changes the code so 
we asynchronously flush the files.
   
   We ensure files are flushed before appending, renaming or closing the log to 
ensure we have the partition metadata information on disk. Three new tests have 
been added to address these cases.
   
   Benchmark by @lbradstreet is included to compare the times. I will update 
this description after I compare trunk to this branch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11035: KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11035:
URL: https://github.com/apache/kafka/pull/11035#discussion_r670091468



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3680,6 +3685,72 @@ public void testRemoveMembersFromGroupRetriableErrors() 
throws Exception {
 }
 }
 
+@Test
+public void testRemoveMembersFromGroupRetriableErrorsInMemberResponse() 
throws Exception {
+// Retriable errors should be retried
+String groupId = "instance-1";
+
+try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+env.kafkaClient().prepareResponse(
+prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+MemberResponse memberResponse = new MemberResponse()
+.setGroupInstanceId(groupId)
+.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());

Review comment:
   Right! Removed. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yanspirit commented on pull request #11020: KAFKA-12937; mm2 can start from the ending of a topic

2021-07-14 Thread GitBox


yanspirit commented on pull request #11020:
URL: https://github.com/apache/kafka/pull/11020#issuecomment-880345058


   > Not sure why I was tagged — did you mean somebody else?
   
   hello, the github recommend.  there's no Assignees, I don't know how to 
trigger the flow
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11035: KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11035:
URL: https://github.com/apache/kafka/pull/11035#discussion_r670079198



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##
@@ -79,55 +90,82 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
-final Errors error = Errors.forCode(response.data().errorCode());
+final Errors error = response.topLevelError();
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
 final Map memberErrors = new HashMap<>();
 for (MemberResponse memberResponse : response.memberResponses()) {
+Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+String memberId = memberResponse.memberId();
+
 memberErrors.put(new MemberIdentity()
- .setMemberId(memberResponse.memberId())
+ .setMemberId(memberId)
  
.setGroupInstanceId(memberResponse.groupInstanceId()),
- Errors.forCode(memberResponse.errorCode()));
+memberError);
 
 }
 completed.put(groupId, memberErrors);
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {

Review comment:
   Agree! Updated!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-07-14 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380951#comment-17380951
 ] 

Dongjin Lee commented on KAFKA-9366:


[~kkonstantine] Got it.

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11022:
URL: https://github.com/apache/kafka/pull/11022#discussion_r670075392



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -2688,8 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception 
{
 try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
 env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-//Retriable FindCoordinatorResponse errors should be retried
-
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));

Review comment:
   Oh, you are right! reverted my change. Thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-07-14 Thread GitBox


ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-880336482


   Merged to trunk and cherrypicked to 3.0 and 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11022:
URL: https://github.com/apache/kafka/pull/11022#discussion_r670073886



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##
@@ -151,38 +154,45 @@ public String apiName() {
 completed.put(groupIdKey, consumerGroupDescription);
 } else {
 failed.put(groupIdKey, new IllegalArgumentException(
-String.format("GroupId %s is not a consumer group 
(%s).",
-groupIdKey.idValue, protocolType)));
+String.format("GroupId %s is not a consumer group (%s).",
+groupIdKey.idValue, protocolType)));
 }
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+return new ApiResult<>(completed, failed,  new 
ArrayList<>(groupsToUnmap));
 }
 
 private void handleError(
 CoordinatorKey groupId,
 Errors error,
 Map failed,
-List unmapped
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`DescribeGroups` response", groupId,
-error.exception());
+log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
 failed.put(groupId, error.exception());
 break;
 case COORDINATOR_LOAD_IN_PROGRESS:
-case COORDINATOR_NOT_AVAILABLE:
+// If the coordinator is in the middle of loading, then we 
just need to retry
+log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
+"is still in the process of loading state. Will retry", 
groupId.idValue);
+groupsToRetry.add(groupId);
 break;
+case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
-log.debug("DescribeGroups request for group {} returned error 
{}. Will retry",
-groupId, error);
-unmapped.add(groupId);
+// If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+// the key so that we retry the `FindCoordinator` request
+log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
+"Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+groupsToUnmap.add(groupId);
 break;
 default:
-log.error("Received unexpected error for group {} in 
`DescribeGroups` response", 
-groupId, error.exception());
-failed.put(groupId, error.exception(
-"Received unexpected error for group " + groupId + " 
in `DescribeGroups` response"));
+final String unexpectedErrorMsg =
+String.format("`DescribeGroups` request for group id %s 
failed due to error %s", groupId.idValue, error);
+log.error(unexpectedErrorMsg);
+failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
   Agree. I just follow the previous behavior. Remove the error message. 
Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests

2021-07-14 Thread GitBox


showuon commented on a change in pull request #11022:
URL: https://github.com/apache/kafka/pull/11022#discussion_r670073415



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##
@@ -109,16 +109,19 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
-DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
-Map completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
+final Map completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
-for (DescribedGroup describedGroup : response.data().groups()) {
+List describedGroups = response.data().groups();
+
+for (DescribedGroup describedGroup : describedGroups) {
 CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(describedGroup.groupId());
 Errors error = Errors.forCode(describedGroup.errorCode());
 if (error != Errors.NONE) {
-handleError(groupIdKey, error, failed, unmapped);
+handleError(groupIdKey, error, failed, groupsToUnmap, 
groupsToRetry);

Review comment:
   Good catch! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380939#comment-17380939
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13010:


Not the exact same test, but I did manage to reproduce this same "only one 
task" failure in 
TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation:
{code:java}
java.lang.AssertionError: only one task
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163)
at 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144)
{code}
I saved the logs since they should actually be the full, un-truncated logs – 
hope this helps: 
[^TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf]

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
> ---
>
> Key: KAFKA-13010
> URL: https://issues.apache.org/jira/browse/KAFKA-13010
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
> Attachments: 
> TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13092) Perf regression in LISR requests

2021-07-14 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-13092:
--

Assignee: Justine Olshan

> Perf regression in LISR requests
> 
>
> Key: KAFKA-13092
> URL: https://issues.apache.org/jira/browse/KAFKA-13092
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Critical
>
> With the addition of partition metadata files, we have an extra operation to 
> do when handling LISR requests. This really slows down the processing, so we 
> should flush asynchronously to fix this regression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-07-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13010:
---
Attachment: 
TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
> ---
>
> Key: KAFKA-13010
> URL: https://issues.apache.org/jira/browse/KAFKA-13010
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
> Attachments: 
> TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13092) Perf regression in LISR requests

2021-07-14 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-13092:
--

 Summary: Perf regression in LISR requests
 Key: KAFKA-13092
 URL: https://issues.apache.org/jira/browse/KAFKA-13092
 Project: Kafka
  Issue Type: Bug
Reporter: Justine Olshan


With the addition of partition metadata files, we have an extra operation to do 
when handling LISR requests. This really slows down the processing, so we 
should flush asynchronously to fix this regression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon closed pull request #10973: KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped

2021-07-14 Thread GitBox


showuon closed pull request #10973:
URL: https://github.com/apache/kafka/pull/10973


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12970) Make tiered storage related schemas adopt flexible versions feature.

2021-07-14 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved KAFKA-12970.

Resolution: Fixed

This is already addressed as mentioned in the comment: 
https://issues.apache.org/jira/browse/KAFKA-12970?focusedCommentId=17365231=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17365231

> Make tiered storage related schemas adopt flexible versions feature. 
> -
>
> Key: KAFKA-12970
> URL: https://issues.apache.org/jira/browse/KAFKA-12970
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13089) Revisit the usage of BufferSuppliers in Kraft

2021-07-14 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380914#comment-17380914
 ] 

Jose Armando Garcia Sancio commented on KAFKA-13089:


{quote}Can each listener hold a thread confined buffer supplier?
{quote}
Yeah. That is one possible solution to this problem. Have the 
{{RaftClient.Listener}} provide a {{BufferSupplier}} during registration.

> Revisit the usage of BufferSuppliers in Kraft
> -
>
> Key: KAFKA-13089
> URL: https://issues.apache.org/jira/browse/KAFKA-13089
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> The latest KafkaRaftClient creates a new BufferSupplier every time it is 
> needed. A buffer supplier is needed when reading from the log and when 
> reading from a snapshot.
> It would be good to investigate if there is a performance and memory usage 
> advantage of sharing the buffer supplier between those use cases and every 
> time the log or snapshot are read.
> If BufferSupplier is share, it is very likely that the implementation will 
> have to be thread-safe because we need support multiple Listeners and each 
> Listener would be using a different thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13091) Increment HW after shrinking ISR through AlterIsr

2021-07-14 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13091:
---

 Summary: Increment HW after shrinking ISR through AlterIsr
 Key: KAFKA-13091
 URL: https://issues.apache.org/jira/browse/KAFKA-13091
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: David Arthur


After we have shrunk the ISR, we have an opportunity to advance the high 
watermark. We do this currently in `maybeShrinkIsr` after the synchronous 
update through ZK. For the AlterIsr path, however, we cannot rely on this call 
since the request is sent asynchronously. Instead we should attempt to advance 
the high watermark in the callback when the AlterIsr response returns 
successfully.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #11055: HOTFIX: Init stream-stream left/outer join emit interval correctly

2021-07-14 Thread GitBox


mjsax commented on pull request #11055:
URL: https://github.com/apache/kafka/pull/11055#issuecomment-880286834


   @kkonstantine I think we should try to get this into 3.0 release. Thoughts?
   
   Call for review @guozhangwang @spena 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11055: HOTFIX: Init stream-stream left/outer join emit interval correctly

2021-07-14 Thread GitBox


mjsax commented on a change in pull request #11055:
URL: https://github.com/apache/kafka/pull/11055#discussion_r670030416



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -103,7 +103,6 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 true
 )) {
 outerJoinWindowStore = 
outerJoinWindowName.map(context::getStateStore);
-sharedTimeTracker.nextTimeToEmit = 
context.currentSystemTimeMs();

Review comment:
   In `init()` the cached system time is still zero, so we need to delay 
the initialization to a later point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #11055: HOTFIX: Init stream-stream left/outer join emit interval correctly

2021-07-14 Thread GitBox


mjsax opened a new pull request #11055:
URL: https://github.com/apache/kafka/pull/11055


   Follow up to KAFKA-10847 (https://github.com/apache/kafka/pull/10917).
   
   The above fix intended to reduce the emit frequency to save the creation 
cost of RocksDB iterators. However, we incorrectly initialized the "timer" with 
timestamp zero, and thus, the timer was always in the past and we did try to 
emit left/outer join result too often and got a throughput of only 500 
record/sec.
   
   This PR fixes the initialization of the emit interval timer. After 
re-running the benchmark, we determined that a default emit interval of 1000ms 
provides the best performance of 12K rec/sec, so this PR also changes the emit 
frequency to 1000ms.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13089) Revisit the usage of BufferSuppliers in Kraft

2021-07-14 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380892#comment-17380892
 ] 

Ismael Juma commented on KAFKA-13089:
-

The BufferSupplier implementations we have are not thread-safe and work best in 
a thread-confined way. Can each listener hold a thread confined buffer supplier?

> Revisit the usage of BufferSuppliers in Kraft
> -
>
> Key: KAFKA-13089
> URL: https://issues.apache.org/jira/browse/KAFKA-13089
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>
> The latest KafkaRaftClient creates a new BufferSupplier every time it is 
> needed. A buffer supplier is needed when reading from the log and when 
> reading from a snapshot.
> It would be good to investigate if there is a performance and memory usage 
> advantage of sharing the buffer supplier between those use cases and every 
> time the log or snapshot are read.
> If BufferSupplier is share, it is very likely that the implementation will 
> have to be thread-safe because we need support multiple Listeners and each 
> Listener would be using a different thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669998610



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+private final boolean unclean;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+}
+}
+for (int replica : registration.replicas) {
+if (!adding.contains(replica)) {
+this.replicas.add(replica);
+}
+}
+if (isr.isEmpty()) {
+// In the special case that all the replicas that are in the ISR 
are also
+// contained in addingReplicas, we choose the first remaining 
replica and add
+// it to the ISR. This is considered an unclean leader election. 
Therefore,
+// calling code must check that unclean leader election is enabled 
before
+// accepting the new ISR.
+if (this.replicas.isEmpty()) {
+// This should not be reachable, since it would require a 
partition
+// starting with an empty replica set prior to the 
reassignment we are
+// trying to revert.
+throw new InvalidReplicaAssignmentException("Invalid replica " 
+
+"assignment: addingReplicas contains all replicas.");
+}
+isr.add(replicas.get(0));

Review comment:
   Hmm, do we need to change isr here? It seems that BestLeader handles the 
unclean leader election with empty isr already.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import 

[GitHub] [kafka] ableegoldman merged pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-07-14 Thread GitBox


ableegoldman merged pull request #10877:
URL: https://github.com/apache/kafka/pull/10877


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-07-14 Thread GitBox


ableegoldman commented on a change in pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#discussion_r670001461



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
##
@@ -845,6 +845,64 @@ public void shouldBeAbleToQueryMapValuesState() throws 
Exception {
 for (final KeyValue batchEntry : batch1) {
 assertEquals(Long.valueOf(batchEntry.value), 
myMapStore.get(batchEntry.key));
 }
+
+final KeyValueIterator range = myMapStore.range("hello", 
"kafka");
+while (range.hasNext()) {
+System.out.println(range.next());
+}
+}
+
+@Test
+public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
+streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+final StreamsBuilder builder = new StreamsBuilder();
+final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
+final Set> batch1 = new HashSet<>(
+Arrays.asList(
+new KeyValue<>(keys[0], "1"),
+new KeyValue<>(keys[1], "1"),
+new KeyValue<>(keys[2], "3"),
+new KeyValue<>(keys[3], "5"),
+new KeyValue<>(keys[4], "2"))
+);
+
+final List> expectedPrefixScanResult = 
Arrays.asList(
+new KeyValue<>(keys[3], 5L),
+new KeyValue<>(keys[1], 1L)
+);
+
+IntegrationTestUtils.produceKeyValuesSynchronously(
+streamOne,
+batch1,
+TestUtils.producerConfig(
+CLUSTER.bootstrapServers(),
+StringSerializer.class,
+StringSerializer.class,
+new Properties()),
+mockTime);
+
+final KTable t1 = builder.table(streamOne);
+t1
+.mapValues(
+(ValueMapper) Long::valueOf,
+Materialized.>as("queryMapValues").withValueSerde(Serdes.Long()))
+.toStream()
+.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
+
+kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
+startKafkaStreamsAndWaitForRunningState(kafkaStreams);
+
+waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
+
+final ReadOnlyKeyValueStore myMapStore =
+IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, 
keyValueStore());
+
+int index = 0;
+final KeyValueIterator range = 
myMapStore.prefixScan("go", Serdes.String().serializer());
+while (range.hasNext()) {

Review comment:
   I know this is just how the other tests are doing it, but it's not 
really an airtight way to validate the expected results...if nothing is 
returned then we never enter the `while` loop and the test passes, even if we 
did in fact expect there to be actual output.
   
   The important thing here was just to make sure it didn't throw an exception 
so it still does that, but it would be good to fix this up maybe in a followup 
PR

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
##
@@ -66,6 +68,20 @@ public MemoryNavigableLRUCache(final String name, final int 
maxCacheSize) {
 .subMap(from, true, to, true).descendingKeySet().iterator(), 
treeMap));
 }
 
+@Override
+public , P> KeyValueIterator 
prefixScan(final P prefix, final PS prefixKeySerializer) {

Review comment:
   1) Could you just treat them as Bytes all the same, and just convert 
to/from an Integer before putting/getting them from the store? That way you're 
still just handling Bytes like you are in this test, it just goes through an 
extra layer of de/serialization. Should be able to more or less copy over the 
existing tests with just a bit of extra code. Can you try this, in a followup 
PR?
   2) Yes, I was just suggesting to merge them as a possible way to make things 
easier and do less work, if it's going to be more then please do file a 
separate ticket for it.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##
@@ -383,6 +387,1002 @@ public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
 assertNull(store.get("key4"));
 }
 
+@Test
+public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
+final String storeName = "prefixScanStore";
+final StoreBuilder> storeBuilder =
+
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String())
+.withCachingDisabled()
+.withLoggingDisabled();
+topology
+.addSource("source1", 

[GitHub] [kafka] cmccabe commented on a change in pull request #11054: KAFKA-13090: Improve kraft snapshot integration test

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #11054:
URL: https://github.com/apache/kafka/pull/11054#discussion_r670005690



##
File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java
##
@@ -489,6 +489,11 @@ public void initializeLeaderEpoch(int epoch) {
 return Optional.ofNullable(snapshots.get(snapshotId));
 }
 
+@Override
+public Optional latestSnapshot() {
+return latestSnapshotId().flatMap(this::readSnapshot);

Review comment:
   I didn't realize they implemented flatMap on Java's Optional, 
interesting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11054: KAFKA-13090: Improve kraft snapshot integration test

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #11054:
URL: https://github.com/apache/kafka/pull/11054#discussion_r670005192



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -84,7 +84,7 @@ public int lastContainedLogEpoch() {
  */
 public long lastContainedLogTimestamp() {
 if (!lastContainedLogTimestamp.isPresent()) {
-// nextBatch is expected to be empty
+// nextBatch is expected to be equal to Optional.empty() so just 
replace it

Review comment:
   Can we check this and throw an exception if it's not true? I don't think 
this method is called often so performance should not be an issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11054: KAFKA-13090: Improve kraft snapshot integration test

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #11054:
URL: https://github.com/apache/kafka/pull/11054#discussion_r670004898



##
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##
@@ -280,11 +280,19 @@ default long truncateToEndOffset(OffsetAndEpoch 
endOffset) {
 Optional readSnapshot(OffsetAndEpoch snapshotId);
 
 /**
- * Returns the latest snapshot id if one exists.
+ * Returns the latest readable snapshot if one exists.
  *
- * @return an Optional snapshot id of the latest snashot if one exists, 
otherwise returns an
- * empty Optional
+ * @return an Optional with the latest readable snapshot, if one exists, 
otherwise
+ * returns an empty Optional
  */
+Optional latestSnapshot();

Review comment:
   would "openLatestSnapshot" be a better name, given that the snapshot 
reader will need to be closed later?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13080) Fetch snapshot request are not directed to kraft in controller

2021-07-14 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-13080.

Resolution: Fixed

> Fetch snapshot request are not directed to kraft in controller
> --
>
> Key: KAFKA-13080
> URL: https://issues.apache.org/jira/browse/KAFKA-13080
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Kraft followers and observer are seeing the following error
> {code:java}
> [2021-07-13 18:15:47,289] ERROR [RaftManager nodeId=2] Unexpected error 
> UNKNOWN_SERVER_ERROR in FETCH_SNAPSHOT response: 
> InboundResponse(correlationId=29862, 
> data=FetchSnapshotResponseData(throttleTimeMs=0, errorCode=-1, topics=[]), 
> sourceId=3001) (org.apache.kafka.raft.KafkaRaftClient) {code}
> This is because ControllerApis is not directing FetchSnapshost request to the 
> raft manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13078) Closing FileRawSnapshotWriter too early

2021-07-14 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-13078.

Resolution: Fixed

> Closing FileRawSnapshotWriter too early
> ---
>
> Key: KAFKA-13078
> URL: https://issues.apache.org/jira/browse/KAFKA-13078
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> We are getting the following error
> {code:java}
>   [2021-07-13 17:23:42,174] ERROR [kafka-raft-io-thread]: Error due to 
> (kafka.raft.KafkaRaftManager$RaftIoThread)
>   java.io.UncheckedIOException: Error calculating snapshot size. temp path = 
> /mnt/kafka/kafka-metadata-logs/@metadata-0/0062-02-3249768281228588378.checkpoint.part,
>  snapshotId = OffsetAndEpoch(offset=62, epoch=2).
>   at 
> org.apache.kafka.snapshot.FileRawSnapshotWriter.sizeInBytes(FileRawSnapshotWriter.java:63)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.maybeSendFetchOrFetchSnapshot(KafkaRaftClient.java:2044)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.pollFollowerAsObserver(KafkaRaftClient.java:2032)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.pollFollower(KafkaRaftClient.java:1995)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.pollCurrentState(KafkaRaftClient.java:2104)
>   at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2217)
>   at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:52)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>   Caused by: java.nio.channels.ClosedChannelException
>   at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>   at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
>   at 
> org.apache.kafka.snapshot.FileRawSnapshotWriter.sizeInBytes(FileRawSnapshotWriter.java:60)
>   ... 7 more
>  {code}
> This is because the {{FollowerState}} is closing the snapshot write passed 
> through the argument instead of the one being replaced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10877: KAFKA-12925: adding presfixScan operation for missed implementations

2021-07-14 Thread GitBox


ableegoldman commented on pull request #10877:
URL: https://github.com/apache/kafka/pull/10877#issuecomment-880255698


   > I have added a test for IQ. Some reason, I wasn't able to run the test on 
my local as I was getting a build failure due to scala test classes. I would 
watch out for the status of the tests here.
   
   Apparently there was an actual issue with a scala class on trunk, if you 
pull/rebase you should be able to build locally again. Looks like the test you 
added did pass though, so nice  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #11054: KAFKA-13090: Improve kraft snapshot integration test

2021-07-14 Thread GitBox


jsancio opened a new pull request #11054:
URL: https://github.com/apache/kafka/pull/11054


   Check and verify generated snapshots for the controllers and the
   brokers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13090) Improve cluster snapshot integration test

2021-07-14 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13090:
--

 Summary: Improve cluster snapshot integration test
 Key: KAFKA-13090
 URL: https://issues.apache.org/jira/browse/KAFKA-13090
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


Extends the test in RaftClusterSnapshotTest to verify that both the controllers 
and brokers are generating snapshots.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13073) Simulation test fails due to inconsistency in MockLog's implementation

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-13073.

Resolution: Fixed

> Simulation test fails due to inconsistency in MockLog's implementation
> --
>
> Key: KAFKA-13073
> URL: https://issues.apache.org/jira/browse/KAFKA-13073
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication
>Affects Versions: 3.0.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> We are getting the following error on trunk
> {code:java}
> RaftEventSimulationTest > canRecoverAfterAllNodesKilled STANDARD_OUT
> timestamp = 2021-07-12T16:26:55.663, 
> RaftEventSimulationTest:canRecoverAfterAllNodesKilled =
>   java.lang.RuntimeException:
> Uncaught exception during poll of node 1  
> |---jqwik---
> tries = 25| # of calls to property
> checks = 25   | # of not rejected calls
> generation = RANDOMIZED   | parameters are randomly generated
> after-failure = PREVIOUS_SEED | use the previous seed
> when-fixed-seed = ALLOW   | fixing the random seed is allowed
> edge-cases#mode = MIXIN   | edge cases are mixed in
> edge-cases#total = 108| # of all combined edge cases
> edge-cases#tried = 4  | # of edge cases tried in current run
> seed = 8079861963960994566| random seed to reproduce generated values 
>Sample
> --
>   arg0: 4002
>   arg1: 2
>   arg2: 4{code}
> I think there are a couple of issues here:
>  # The {{ListenerContext}} for {{KafkaRaftClient}} uses the value returned by 
> {{ReplicatedLog::startOffset()}} to determined the log start and when to load 
> a snapshot while the {{MockLog}} implementation uses {{logStartOffset}} which 
> could be a different value.
>  # {{MockLog}} doesn't implement {{ReplicatedLog::maybeClean}} so the log 
> start offset is always 0.
>  # The snapshot id validation for {{MockLog}} and {{KafkaMetadataLog}}'s 
> {{createNewSnapshot}} throws an exception when the snapshot id is less than 
> the log start offset.
> Solutions:
> Fix the error quoted above we only need to fix bullet point 3. but I think we 
> should fix all of the issues enumerated in this Jira.
> For 1. we should change the {{MockLog}} implementation so that it uses 
> {{startOffset}} both externally and internally.
> For 2. I will file another issue to track this implementation.
> For 3. I think this validation is too strict. I think it is safe to simply 
> ignore any attempt by the state machine to create an snapshot with an id less 
> that the log start offset. We should return a {{Optional.empty()}}when the 
> snapshot id is less than the log start offset. This tells the user that it 
> doesn't need to generate a snapshot for that offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7632) Support Compression Level

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380866#comment-17380866
 ] 

Konstantine Karantasis edited comment on KAFKA-7632 at 7/14/21, 9:50 PM:
-

This feature was not approved on time for 3.0. Pushing the target version to 3.1


was (Author: kkonstantine):
This feature was not approved in time for 3.0. Pushing the target version to 3.1

> Support Compression Level
> -
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Dongjin Lee
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380874#comment-17380874
 ] 

Konstantine Karantasis commented on KAFKA-9366:
---

This feature was not approved on time for 3.0. Pushing the target version to 3.1
[|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13198668]

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9366) Upgrade log4j to log4j2

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9366:
--
Fix Version/s: (was: 3.0.0)
   3.1.0

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] niket-goel opened a new pull request #11053: [KAFKA-13015] Ducktape System Tests to for Metadata Snapshots

2021-07-14 Thread GitBox


niket-goel opened a new pull request #11053:
URL: https://github.com/apache/kafka/pull/11053


   [WIP]
   This PR implements system tests in ducktape to test the ability of brokers 
and controllers to generate and consume snapshots and catch up with the 
metadata log.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380872#comment-17380872
 ] 

Konstantine Karantasis commented on KAFKA-12495:


This issue corresponds to a corner case that does not seem to appear in 
practice often. The current suggestion to allow for consecutive revocations 
carries some risk. I have another fix in mind that I'd like to explore. In the 
meantime I'm punting this issue to the next release. 

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to 

[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12495:
---
Fix Version/s: 3.1.0

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0
>
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [BT4, BT5]
> W1 becomes leader 

[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380871#comment-17380871
 ] 

Konstantine Karantasis commented on KAFKA-12283:


As mentioned above, this failure corresponds to a corner case that does not 
seem to appear in practice often. The current suggestion to allow for 
consecutive revocations carries some risk. I have another fix in mind that I'd 
like to explore. In the meantime I'm punting this issue to the next release. 

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12283:
---
Fix Version/s: (was: 3.0.0)
   3.1.0

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7632) Support Compression Level

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380866#comment-17380866
 ] 

Konstantine Karantasis commented on KAFKA-7632:
---

This feature was not approved in time for 3.0. Pushing the target version to 3.1

> Support Compression Level
> -
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Dongjin Lee
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7632) Support Compression Level

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-7632:
--
Fix Version/s: (was: 3.0.0)
   3.1.0

> Support Compression Level
> -
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Dongjin Lee
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13089) Revisit the usage of BufferSuppliers in Kraft

2021-07-14 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13089:
--

 Summary: Revisit the usage of BufferSuppliers in Kraft
 Key: KAFKA-13089
 URL: https://issues.apache.org/jira/browse/KAFKA-13089
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


The latest KafkaRaftClient creates a new BufferSupplier every time it is 
needed. A buffer supplier is needed when reading from the log and when reading 
from a snapshot.

It would be good to investigate if there is a performance and memory usage 
advantage of sharing the buffer supplier between those use cases and every time 
the log or snapshot are read.

If BufferSupplier is share, it is very likely that the implementation will have 
to be thread-safe because we need support multiple Listeners and each Listener 
would be using a different thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #10994: KAFKA-8410: Update the docs to reference the new PAPI

2021-07-14 Thread GitBox


vvcephei commented on a change in pull request #10994:
URL: https://github.com/apache/kafka/pull/10994#discussion_r669947367



##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -86,12 +86,48 @@ Overviewclose() method. Note that Kafka Streams may re-use a 
single
   Processor object by calling
   init() on it again after close().
-When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
-  (1) If #forward() is called within #process() the output record inherits 
the input record timestamp.
-  (2) If #forward() is called within punctuate() the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
-  Note, that #forward() also allows to change the default behavior 
by passing a custom timestamp for the output record.
-Specifically, ProcessorContext#schedule() accepts a user Punctuator callback 
interface, which triggers its punctuate()
-API method periodically based on the PunctuationType. The PunctuationType 
determines what notion of time is used
+  
+The Processor interface takes two sets of generic 
parameters:
+KIn, VIn, KOut, 
VOut. These define the input and output types
+that the processor implementation can handle. KIn and
+VIn 
define the key and value types that will be passed
+to process().
+Likewise, KOut and VOut
+define the forwarded key and value types that ProcessorContext#forward()
+will accept. If your processor does not forward any records at all 
(or if it only forwards
+null keys or values),
+a best practice is to set the output generic type argument to
+Void.
+If it needs to forward multiple types that don't share a common 
superclass, you will
+have to set the output generic type argument to Object.
+  
+  
+Both the Processor#process()
+and the ProcessorContext#forward()
+methods handle precords in the form of the RecordK, V
+data class. This class gives you access to the key components of a 
Kafka record:
+the key, value, timestamp and headers. When forwarding records, 
you can use the
+constructor to create a new Record
+from scratch, or you can use the convenience builder methods to 
replace one of the
+Record's properties
+and copy over the rest. For example,
+inputRecord.withValue(newValue)
+would copy the key, timestamp, and headers from
+inputRecord while
+setting the output record's value to newValue.
+Note that this does not mutate inputRecord,
+but instead creates a shallow copy. Beware that this is only a 
shallow copy, so if you
+plan to mutate the key, value, or headers elsewhere in the 
program, you will want to
+create a deep copy of those fields yourself.
+  
+
+  In addition to handling incoming records via
+  Processor#process(),
+  you have the option to schedule periodic invocation (called 
"punctuation")
+  in your processor's init()

Review comment:
   Hmm, that actually does sounds really useful. I never thought of it 
before. I'll file a ticket to document this use case. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669944074



##
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##
@@ -422,4 +351,75 @@ class RaftClusterTest {
   listenerName = listenerName
 )
 
+  @Test
+  def testCreateClusterAndPerformReassignment(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(4).
+setNumControllerNodes(3).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  cluster.waitForReadyBrokers()
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+// Create the topic.
+val assignments = new util.HashMap[Integer, util.List[Integer]]
+assignments.put(0, Arrays.asList(0, 1, 2))
+assignments.put(1, Arrays.asList(1, 2, 3))
+assignments.put(2, Arrays.asList(2, 3, 0))
+val createTopicResult = admin.createTopics(Collections.singletonList(
+  new NewTopic("foo", assignments)))
+createTopicResult.all().get()
+waitForTopicListing(admin, Seq("foo"), Seq())
+
+// Start some reassignments.
+assertEquals(Collections.emptyMap(), 
admin.listPartitionReassignments().reassignments().get())
+val reassignments = new util.HashMap[TopicPartition, 
Optional[NewPartitionReassignment]]
+reassignments.put(new TopicPartition("foo", 0),
+  Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0
+reassignments.put(new TopicPartition("foo", 1),
+  Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2

Review comment:
   Good idea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl opened a new pull request #11052: Use ByteBuffers for LZ4 OutputStream

2021-07-14 Thread GitBox


xvrl opened a new pull request #11052:
URL: https://github.com/apache/kafka/pull/11052


   Our current LZ4 OutputStream implementation allocates compression buffers
   internally and relies on intermediate byte arrays for input and output 
buffers.
   
   With this change we now use ByteBuffers internally, and as a result:
   * we write directly to the target ByteBuffer, avoiding an additional copy
   * we no longer allocate an output compression buffer, reducing allocations 
by half
   * we pave the way to make compression buffers reusable, similar to what we do
 for decompression


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669926069



##
File path: 
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
##
@@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() {
 setIsNew(false).toString(),
 b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), 
false).toString());
 }
+
+@Test
+public void testMergePartitionChangeRecordWithReassignmentData() {
+PartitionRegistration partition0 = new PartitionRegistration(new int[] 
{1, 2, 3},
+new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
+PartitionRegistration partition1 = partition0.merge(new 
PartitionChangeRecord().
+setRemovingReplicas(Collections.singletonList(3)).
+setAddingReplicas(Collections.singletonList(4)).
+setReplicas(Arrays.asList(1, 2, 3, 4)));
+assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
+new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), 
partition1);
+PartitionRegistration partition2 = partition1.merge(new 
PartitionChangeRecord().
+setIsr(Arrays.asList(1, 2, 4)).
+setRemovingReplicas(Collections.emptyList()).
+setAddingReplicas(Collections.emptyList()).
+setReplicas(Arrays.asList(1, 2, 4)));
+assertEquals(new PartitionRegistration(new int[] {1, 2, 4},
+new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), 
partition2);
+assertFalse(partition2.isReassigning());
+}
+
+@Test
+public void testPartitionControlInfoIsrChangeCompletesReassignment() {
+PartitionRegistration partition0 = new PartitionRegistration(
+new int[]{1, 2, 3, 4}, new int[]{3}, new int[]{3}, new int[] {}, 
1, 0, 0);

Review comment:
   Good idea. But looking again, we can remove this test because the 
function has been shifted out of PartitionRegistration




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669925103



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static 
org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class PartitionChangeBuilderTest {
+@Test
+public void testChangeRecordIsNoOp() {
+assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
+assertFalse(changeRecordIsNoOp(new 
PartitionChangeRecord().setLeader(1)));
+assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+setIsr(Arrays.asList(1, 2, 3;
+assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+setRemovingReplicas(Arrays.asList(1;
+assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+setAddingReplicas(Arrays.asList(4;
+}
+
+private final static PartitionRegistration FOO = new PartitionRegistration(
+new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
+1, 100, 200);
+
+private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+private static PartitionChangeBuilder createFooBuilder(boolean 
allowUnclean) {
+return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> 
allowUnclean);
+}
+
+private final static PartitionRegistration BAR = new PartitionRegistration(
+new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] 
{4},
+1, 100, 200);
+
+private final static Uuid BAR_ID = 
Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
+
+private static PartitionChangeBuilder createBarBuilder(boolean 
allowUnclean) {
+return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> 
allowUnclean);
+}
+
+private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
+   int expectedNode,
+   boolean expectedUnclean) {
+BestLeader bestLeader = builder.new BestLeader();
+assertEquals(expectedNode, bestLeader.node);
+assertEquals(expectedUnclean, bestLeader.unclean);
+}
+
+@Test
+public void testBestLeader() {
+assertBestLeaderEquals(createFooBuilder(false), 2, false);
+assertBestLeaderEquals(createFooBuilder(true), 2, false);
+assertBestLeaderEquals(createFooBuilder(false).
+setTargetIsr(Arrays.asList(1, 3)), 1, false);
+assertBestLeaderEquals(createFooBuilder(true).
+setTargetIsr(Arrays.asList(1, 3)), 1, false);
+assertBestLeaderEquals(createFooBuilder(false).
+setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
+assertBestLeaderEquals(createFooBuilder(true).
+setTargetIsr(Arrays.asList(3)), 2, true);
+assertBestLeaderEquals(createFooBuilder(true).
+
setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+4, false);
+}
+
+@Test
+public void testShouldTryElection() {
+

[jira] [Commented] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380838#comment-17380838
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13010:


[~wcarl...@confluent.io] I'm guessing you wrote this test so you have the most 
context, can you reproduce this locally and take a minute or two to look 
through the logs and see if anything jumps out at you? 

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
> ---
>
> Key: KAFKA-13010
> URL: https://issues.apache.org/jira/browse/KAFKA-13010
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669911056



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
##
@@ -103,7 +103,8 @@ public TopicImage apply() {
 for (Entry entry : 
partitionChanges.entrySet()) {
 if (entry.getValue().leader == brokerId) {
 PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
-if (prevPartition == null || prevPartition.leader != brokerId) 
{
+if (prevPartition == null ||
+prevPartition.leaderEpoch != 
entry.getValue().leaderEpoch) {

Review comment:
   Good point. Thinking about it more, we can just check for partition 
epoch here, since LE can't change without PE changing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11035: KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests

2021-07-14 Thread GitBox


dajac commented on a change in pull request #11035:
URL: https://github.com/apache/kafka/pull/11035#discussion_r669905082



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##
@@ -79,55 +90,82 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
-final Errors error = Errors.forCode(response.data().errorCode());
+final Errors error = response.topLevelError();
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
 final Map memberErrors = new HashMap<>();
 for (MemberResponse memberResponse : response.memberResponses()) {
+Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+String memberId = memberResponse.memberId();
+
 memberErrors.put(new MemberIdentity()
- .setMemberId(memberResponse.memberId())
+ .setMemberId(memberId)
  
.setGroupInstanceId(memberResponse.groupInstanceId()),
- Errors.forCode(memberResponse.errorCode()));
+memberError);

Review comment:
   nit: We could revert this change as it does not bring much and re-align 
like it was before.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##
@@ -79,55 +90,82 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
+validateKeys(groupIds);
+
 final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-Map> completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final Map> completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
-final Errors error = Errors.forCode(response.data().errorCode());
+final Errors error = response.topLevelError();
 if (error != Errors.NONE) {
-handleError(groupId, error, failed, unmapped);
+handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
 } else {
 final Map memberErrors = new HashMap<>();
 for (MemberResponse memberResponse : response.memberResponses()) {
+Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+String memberId = memberResponse.memberId();
+
 memberErrors.put(new MemberIdentity()
- .setMemberId(memberResponse.memberId())
+ .setMemberId(memberId)
  
.setGroupInstanceId(memberResponse.groupInstanceId()),
- Errors.forCode(memberResponse.errorCode()));
+memberError);
 
 }
 completed.put(groupId, memberErrors);
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+return new ApiResult<>(
+completed,
+failed,
+Collections.emptyList()
+);
+} else {
+// retry the request, so don't send completed/failed results back
+return new ApiResult<>(
+Collections.emptyMap(),
+Collections.emptyMap(),
+new ArrayList<>(groupsToUnmap)
+);
+}
 }
 
-private void handleError(
+private void handleGroupError(
 CoordinatorKey groupId,
-Errors error, Map failed,
-List unmapped
+Errors error,
+Map failed,
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`LeaveGroup` response", groupId,
-error.exception());
+log.debug("`LeaveGroup` request for group id {} failed due to 
error {}", groupId.idValue, error);
 failed.put(groupId, 

[GitHub] [kafka] jsancio closed pull request #6777: MINOR: Run the Java and Scala documentation in Jenkins

2021-07-14 Thread GitBox


jsancio closed pull request #6777:
URL: https://github.com/apache/kafka/pull/6777


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10953: MINOR: Default GRACE with Old API should set as 24H minus window-size / inactivity-gap

2021-07-14 Thread GitBox


mjsax commented on pull request #10953:
URL: https://github.com/apache/kafka/pull/10953#issuecomment-880159699


   > but when users upgrade the Streams apps to 3.0 we guarantee retention time 
T+X
   
   I don't agree to this statement. It's a bug in the implementation (that this 
PR fixes) that we keep data for T+X, but the contract to guarantee that data is 
preserved (only) up to T does not change IMHO.
   
   Overall, the PR LGTM. If I read the code of this PR and the previous PRs 
correctly, it should put us back to the 2.8 behavior. We need to include the 
fix for JoinWindows though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669899334



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
 Set addInvalidTopics,
 Set addInternalTopics,
 Node newController,
+Map topicIds,
 BiPredicate retainTopic) {
 
 Predicate shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+Map newTopicIds = new HashMap<>(topicIds.size());
+
+// We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+// we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+for (Map.Entry entry : this.topicIds.entrySet()) {
+if (shouldRetainTopic.test(entry.getKey())) {
+newTopicIds.put(entry.getKey(), entry.getValue());
+}
+}
+
 for (PartitionMetadata partition : addPartitions) {
 newMetadataByPartition.put(partition.topicPartition, partition);
+Uuid id = topicIds.get(partition.topic());
+if (id != null)
+newTopicIds.put(partition.topic(), id);
+else
+// Remove if the latest metadata does not have a topic ID

Review comment:
   Yeah. That was my reasoning. I thought the upgrade/downgrade case would 
be rare and the guarantees harder to reason about there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669896829



##
File path: 
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
##
@@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState 
toLeaderAndIsrPartitionState(TopicPartition tp
 setIsNew(isNew);
 }
 
+/**
+ * Returns true if this partition is reassigning.
+ */
+public boolean isReassigning() {
+return removingReplicas.length > 0 | addingReplicas.length > 0;
+}
+
+/**
+ * Check if an ISR change completes this partition's reassignment.
+ *
+ * @param newIsrThe new ISR.
+ * @return  True if the reassignment is complete.
+ */
+public boolean isrChangeCompletesReassignment(int[] newIsr) {

Review comment:
   Yes, this is unused now. I'll remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11022: KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests

2021-07-14 Thread GitBox


dajac commented on a change in pull request #11022:
URL: https://github.com/apache/kafka/pull/11022#discussion_r669797588



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##
@@ -109,16 +109,19 @@ public String apiName() {
 Set groupIds,
 AbstractResponse abstractResponse
 ) {
-DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
-Map completed = new 
HashMap<>();
-Map failed = new HashMap<>();
-List unmapped = new ArrayList<>();
+final DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
+final Map completed = new 
HashMap<>();
+final Map failed = new HashMap<>();
+final Set groupsToUnmap = new HashSet<>();
+final Set groupsToRetry = new HashSet<>();
 
-for (DescribedGroup describedGroup : response.data().groups()) {
+List describedGroups = response.data().groups();
+
+for (DescribedGroup describedGroup : describedGroups) {
 CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(describedGroup.groupId());
 Errors error = Errors.forCode(describedGroup.errorCode());
 if (error != Errors.NONE) {
-handleError(groupIdKey, error, failed, unmapped);
+handleError(groupIdKey, error, failed, groupsToUnmap, 
groupsToRetry);

Review comment:
   `groupsToRetry` to retry is not really necessary in this case. We don't 
even use it later. Could we remove it?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -2688,8 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception 
{
 try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
 env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-//Retriable FindCoordinatorResponse errors should be retried
-
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));

Review comment:
   Do we really need to remove this one? It seems to me that the changes in 
the PR does not change how the find coordinator response is handled, no?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##
@@ -151,38 +154,45 @@ public String apiName() {
 completed.put(groupIdKey, consumerGroupDescription);
 } else {
 failed.put(groupIdKey, new IllegalArgumentException(
-String.format("GroupId %s is not a consumer group 
(%s).",
-groupIdKey.idValue, protocolType)));
+String.format("GroupId %s is not a consumer group (%s).",
+groupIdKey.idValue, protocolType)));
 }
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+return new ApiResult<>(completed, failed,  new 
ArrayList<>(groupsToUnmap));

Review comment:
   nit: There is an extra space before `new`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##
@@ -151,38 +154,45 @@ public String apiName() {
 completed.put(groupIdKey, consumerGroupDescription);
 } else {
 failed.put(groupIdKey, new IllegalArgumentException(
-String.format("GroupId %s is not a consumer group 
(%s).",
-groupIdKey.idValue, protocolType)));
+String.format("GroupId %s is not a consumer group (%s).",
+groupIdKey.idValue, protocolType)));
 }
 }
-return new ApiResult<>(completed, failed, unmapped);
+
+return new ApiResult<>(completed, failed,  new 
ArrayList<>(groupsToUnmap));
 }
 
 private void handleError(
 CoordinatorKey groupId,
 Errors error,
 Map failed,
-List unmapped
+Set groupsToUnmap,
+Set groupsToRetry
 ) {
 switch (error) {
 case GROUP_AUTHORIZATION_FAILED:
-log.error("Received authorization failure for group {} in 
`DescribeGroups` response", groupId,
-error.exception());
+log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
 failed.put(groupId, error.exception());
 break;
 case COORDINATOR_LOAD_IN_PROGRESS:
-case COORDINATOR_NOT_AVAILABLE:
+// If the coordinator is in the middle of loading, then we 
just need to retry
+log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
+"is still in the 

[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669891898



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+private final List replicas;
+private final List isr;
+
+PartitionReassignmentRevert(PartitionRegistration registration) {
+// Figure out the replica list and ISR that we will have after 
reverting the
+// reassignment. In general, we want to take out any replica that the 
reassignment
+// was adding, but keep the ones the reassignment was removing. (But 
see the
+// special case below.)
+Set adding = Replicas.toSet(registration.addingReplicas);
+this.replicas = new ArrayList<>(registration.replicas.length);
+this.isr = new ArrayList<>(registration.isr.length);
+for (int i = 0; i < registration.isr.length; i++) {
+int replica = registration.isr[i];
+if (!adding.contains(replica)) {
+this.isr.add(replica);
+} else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+// This is a special case where taking out all the "adding" 
replicas is
+// not possible. The reason it is not possible is that doing 
so would
+// create an empty ISR, which is not allowed.
+//
+// In this case, we leave in one of the adding replicas 
permanently.

Review comment:
   I guess we can just not allow the reassignment to be cancelled in this 
situation. It should still be possible to undo the reassignment by creating a 
new reassignment, so it doesn't totally trap the user, I suppose...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10753: KAFKA-12803: Support reassigning partitions when in KRaft mode

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669215608



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+if (record.isr() != null) return false;
+if (record.leader() != NO_LEADER_CHANGE) return false;
+if (record.replicas() != null) return false;
+if (record.removingReplicas() != null) return false;
+if (record.addingReplicas() != null) return false;
+return true;
+}
+
+private final PartitionRegistration partition;
+private final Uuid topicId;
+private final int partitionId;
+private final Function isAcceptableLeader;
+private final Supplier uncleanElectionOk;
+private List targetIsr;
+private List targetReplicas;
+private List targetRemoving;
+private List targetAdding;
+private boolean alwaysElectPreferredIfPossible;
+
+public PartitionChangeBuilder(PartitionRegistration partition,
+  Uuid topicId,
+  int partitionId,
+  Function 
isAcceptableLeader,
+  Supplier uncleanElectionOk) {
+this.partition = partition;
+this.topicId = topicId;
+this.partitionId = partitionId;
+this.isAcceptableLeader = isAcceptableLeader;
+this.uncleanElectionOk = uncleanElectionOk;
+this.targetIsr = Replicas.toList(partition.isr);
+this.targetReplicas = Replicas.toList(partition.replicas);
+this.targetRemoving = Replicas.toList(partition.removingReplicas);
+this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.alwaysElectPreferredIfPossible = false;
+}
+
+public PartitionChangeBuilder setTargetIsr(List targetIsr) {
+this.targetIsr = targetIsr;
+return this;
+}
+
+public PartitionChangeBuilder setTargetReplicas(List 
targetReplicas) {
+this.targetReplicas = targetReplicas;
+return this;
+}
+
+public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
   I also added a unit test for `electLeaders`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669881047



##
File path: clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
##
@@ -130,13 +150,30 @@ MetadataCache mergeWith(String newClusterId,
 Set addInvalidTopics,
 Set addInternalTopics,
 Node newController,
+Map topicIds,
 BiPredicate retainTopic) {
 
 Predicate shouldRetainTopic = topic -> retainTopic.test(topic, 
internalTopics.contains(topic));
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
+Map newTopicIds = new HashMap<>(topicIds.size());
+
+// We want the most recent topic ID. We add the old one here for 
retained topics and then update with newest information in the MetadataResponse
+// we add if a new topic ID is added or remove if the request did not 
support topic IDs for this topic.
+for (Map.Entry entry : this.topicIds.entrySet()) {
+if (shouldRetainTopic.test(entry.getKey())) {
+newTopicIds.put(entry.getKey(), entry.getValue());
+}
+}
+
 for (PartitionMetadata partition : addPartitions) {
 newMetadataByPartition.put(partition.topicPartition, partition);
+Uuid id = topicIds.get(partition.topic());
+if (id != null)
+newTopicIds.put(partition.topic(), id);
+else
+// Remove if the latest metadata does not have a topic ID

Review comment:
   We can leave it as is I guess since I can't think of a strong case to 
remove it. It is a rare situation that we would hit this case and the 
consequence of losing the topic ID is probably not too bad. Worst case, we 
might miss a recreation which occurred while the cluster was rolling to upgrade 
or downgrade. On the other hand, it could lead to other kinds of problems if we 
allow updates to the epoch information tied to a topic ID without being able to 
validate that the topic ID is correct, so maybe this logic is for the best.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-07-14 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-880127165


   Thanks @cadonna for the feedback.
   I've replied/addressed all of your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-07-14 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r669865915



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -51,12 +57,12 @@ public boolean assign(final Map clients,
 final SortedSet statefulTasks = new TreeSet<>(statefulTaskIds);
 final TreeMap clientStates = new TreeMap<>(clients);
 
-assignActiveStatefulTasks(clientStates, statefulTasks);
+final Map statefulTasksClientMappings = 
assignActiveStatefulTasks(clientStates, statefulTasks);
 
 assignStandbyReplicaTasks(
 clientStates,
-statefulTasks,
-configs.numStandbyReplicas
+statefulTasksClientMappings,
+configs
 );

Review comment:
   Sure! done.
   Personal preference. Having all the strategies of standby task assignment 
implementations in a single class makes unit testing a bit easier. But I do 
agree that removing one extra class is indeed good idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13033) coordinator not available error should cause add into unmap list to do a new lookup

2021-07-14 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13033:

Priority: Blocker  (was: Major)

> coordinator not available error should cause add into unmap list to do a new 
> lookup
> ---
>
> Key: KAFKA-13033
> URL: https://issues.apache.org/jira/browse/KAFKA-13033
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In KIP-699, we add some handler to handle different types of operation. In 
> the `handleError`, we didn't make the `COORDINATOR_NOT_AVAILABLE` as 
> unmapped, to do a re-lookup. In 
> [DescribeTransactionsHandler|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java#L172-L186],
>  there's already explained by [~hachikuji] why `COORDINATOR_NOT_AVAILABLE` 
> and `NOT_COORDINATOR` should be listed in unmapped, and 
> `COORDINATOR_LOAD_IN_PROGRESS` should not.
>  
> {code:java}
> case COORDINATOR_LOAD_IN_PROGRESS:
> // If the coordinator is in the middle of loading, then we just need to 
> retry
> log.debug("DescribeTransactions request for transactionalId `{}` failed 
> because the " +
> "coordinator is still in the process of loading state. Will 
> retry",
> transactionalIdKey.idValue);
> break;
> case NOT_COORDINATOR:
> case COORDINATOR_NOT_AVAILABLE:
> // If the coordinator is unavailable or there was a coordinator change, 
> then we unmap
> // the key so that we retry the `FindCoordinator` request
> unmapped.add(transactionalIdKey);
> log.debug("DescribeTransactions request for transactionalId `{}` returned 
> error {}. Will attempt " +
> "to find the coordinator again and retry", 
> transactionalIdKey.idValue, error);
> break;
> {code}
> We should be consistent with it. Fix it, add logs and comments, and also 
> update the tests.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-07-14 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r669853519



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+abstract class StandbyTaskAssignor {
+protected final AssignmentConfigs configs;
+
+StandbyTaskAssignor(final AssignmentConfigs configs) {
+this.configs = configs;
+}
+
+abstract void assignStandbyTasks(final Map 
statefulTasksWithClients,
+ final TreeMap 
clientStates);

Review comment:
   I didn't give it much thought to be honest.
   
   `TreeMap` for the `clientStates` was already used in the 
`HighAvailabilityTaskAssignor` and went with the same signature here. I think 
it makes sense to change the contract to be a `SortedMap`. Will do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-07-14 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r669851397



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -51,12 +57,12 @@ public boolean assign(final Map clients,
 final SortedSet statefulTasks = new TreeSet<>(statefulTaskIds);
 final TreeMap clientStates = new TreeMap<>(clients);
 
-assignActiveStatefulTasks(clientStates, statefulTasks);
+final Map statefulTasksClientMappings = 
assignActiveStatefulTasks(clientStates, statefulTasks);

Review comment:
   I tried to avoid unnecessary iterations. With that we would have to do 
separate iteration in the `ClientTagAwareStandbyTaskAssignor`, which felt 
redundant, since `assignActiveStatefulTasks` can return necessary mapping since 
it has to iterate over client states either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-07-14 Thread GitBox


lkokhreidze commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r669849371



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor extends StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+ClientTagAwareStandbyTaskAssignor(final AssignmentConfigs configs) {
+super(configs);
+}
+
+@Override
+public void assignStandbyTasks(final Map 
statefulTasksWithClients,
+   final TreeMap 
clientStates) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+
+final StandbyTaskDistributor standbyTaskDistributor = new 
StandbyTaskDistributor(
+numStandbyReplicas,
+clientStates,
+rackAwareAssignmentTags,
+statefulTasksWithClients
+);
+
+
statefulTasksWithClients.forEach(standbyTaskDistributor::assignStandbyTasksForActiveTask);
+}
+
+@Override
+public boolean isValidTaskMovement(final TaskMovementAttempt 
taskMovementAttempt) {
+final Map sourceClientTags = 
taskMovementAttempt.sourceClient().clientTags();
+final Map destinationClientTags = 
taskMovementAttempt.destinationClient().clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static final class StandbyTaskDistributor {

Review comment:
   Thanks for the feedback Bruno.
   
   I reasoned that, since internal states like `clientsPerTagValue`, 
`standbyTaskClientsByTaskLoad`, etc., have to be allocated per invocation of 
`assignStandbyTasks` method, it felt easier and more readable to create one 
single internal object rather than invalidating local caches in 
`ClientTagAwareStandbyTaskAssignor`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12308) ConfigDef.parseType deadlock

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-12308:
---
Fix Version/s: 3.0.0

> ConfigDef.parseType deadlock
> 
>
> Key: KAFKA-12308
> URL: https://issues.apache.org/jira/browse/KAFKA-12308
> Project: Kafka
>  Issue Type: Bug
>  Components: config, KafkaConnect
>Affects Versions: 2.5.0
> Environment: kafka 2.5.0
> centos7
> java version "1.8.0_231"
>Reporter: cosmozhu
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: deadlock.log
>
>
> hi,
>  the problem was found, when I restarted *ConnectDistributed*
> I restart ConnectDistributed in the single node for the test, with not delete 
> connectors.
>  sometimes the process stopped when creating connectors.
> I add some logger and found it had a deadlock in `ConfigDef.parseType`.My 
> connectors always have the same transforms. I guess when connector startup 
> (in startAndStopExecutor which default 8 threads) and load the same class 
> file it has something wrong.
> I attached the jstack log file.
> thanks for any help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12308) ConfigDef.parseType deadlock

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-12308.

Resolution: Fixed

> ConfigDef.parseType deadlock
> 
>
> Key: KAFKA-12308
> URL: https://issues.apache.org/jira/browse/KAFKA-12308
> Project: Kafka
>  Issue Type: Bug
>  Components: config, KafkaConnect
>Affects Versions: 2.5.0
> Environment: kafka 2.5.0
> centos7
> java version "1.8.0_231"
>Reporter: cosmozhu
>Priority: Major
> Attachments: deadlock.log
>
>
> hi,
>  the problem was found, when I restarted *ConnectDistributed*
> I restart ConnectDistributed in the single node for the test, with not delete 
> connectors.
>  sometimes the process stopped when creating connectors.
> I add some logger and found it had a deadlock in `ConfigDef.parseType`.My 
> connectors always have the same transforms. I guess when connector startup 
> (in startAndStopExecutor which default 8 threads) and load the same class 
> file it has something wrong.
> I attached the jstack log file.
> thanks for any help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7421) Deadlock in Kafka Connect during class loading

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380773#comment-17380773
 ] 

Konstantine Karantasis commented on KAFKA-7421:
---

The fix has now been merged. Let's keep track and report any new issues if they 
appear. Some context exists on 
https://issues.apache.org/jira/browse/KAFKA-12308 as well which reported a 
similar issue. 

> Deadlock in Kafka Connect during class loading
> --
>
> Key: KAFKA-7421
> URL: https://issues.apache.org/jira/browse/KAFKA-7421
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Maciej Bryński
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.0.0
>
>
> I'm getting this deadlock on half of Kafka Connect runs when having two 
> different types connectors (in this configuration it's debezium and hdfs).
> Thread 1:
> {code}
> "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Thread 2:
> {code}
> "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>blocks pool-22-thread-2@4748
>waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:406)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked <0x1424> (a java.lang.Object)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> - locked <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at 
> io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> 

[jira] [Resolved] (KAFKA-7421) Deadlock in Kafka Connect during class loading

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-7421.
---
Resolution: Fixed

> Deadlock in Kafka Connect during class loading
> --
>
> Key: KAFKA-7421
> URL: https://issues.apache.org/jira/browse/KAFKA-7421
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Maciej Bryński
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.0.0
>
>
> I'm getting this deadlock on half of Kafka Connect runs when having two 
> different types connectors (in this configuration it's debezium and hdfs).
> Thread 1:
> {code}
> "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Thread 2:
> {code}
> "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>blocks pool-22-thread-2@4748
>waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:406)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked <0x1424> (a java.lang.Object)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> - locked <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at 
> io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 

[jira] [Updated] (KAFKA-7421) Deadlock in Kafka Connect during class loading

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-7421:
--
Fix Version/s: 3.0.0

> Deadlock in Kafka Connect during class loading
> --
>
> Key: KAFKA-7421
> URL: https://issues.apache.org/jira/browse/KAFKA-7421
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Maciej Bryński
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 3.0.0
>
>
> I'm getting this deadlock on half of Kafka Connect runs when having two 
> different types connectors (in this configuration it's debezium and hdfs).
> Thread 1:
> {code}
> "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Thread 2:
> {code}
> "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>blocks pool-22-thread-2@4748
>waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:406)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked <0x1424> (a java.lang.Object)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> - locked <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at 
> io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 

[jira] [Commented] (KAFKA-12308) ConfigDef.parseType deadlock

2021-07-14 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380772#comment-17380772
 ] 

Konstantine Karantasis commented on KAFKA-12308:


Adding the comment that I added in the PR here as well: 



The idea that the {{DelegatingClassLoader}} did not have to be parallel capable 
originated to the fact that it doesn't load classes directly. It delegates 
loading either to the appropriate PluginClassLoader directly via composition, 
or to the parent by calling {{super.loadClass}}.

The latter is the key point of why we need to make the 
{{DelegatingClassLoader}} also parallel capable even though it doesn't load a 
class. Because inheritance is used (via a call to {{super.loadClass}}) and not 
composition (via a hypothetical call to {{parent.loadClass}}, which is not 
possible because {{parent}} is a private member of the base abstract class 
{{ClassLoader}}) when {{getClassLoadingLock}} is called in {{super.loadClass}} 
it checks that actually the derived class (here an instance of 
{{DelegatingClassLoader}}) is not parallel capable and therefore ends up not 
applying fine-grain locking during classloading even though the parent 
clasloader is used actually load the classes.

Based on the above, the {{DelegatingClassLoader}} needs to be parallel capable 
too in order for the parent loader to load classes in parallel. 

I've tested both classloader types being parallel capable in a variety of 
scenarios with multiple connectors, SMTs and converters and a deadlock did not 
reproduce. Of course reproducing the issue is difficult without the specifics 
of the jar layout to begin with. The possibility of a deadlock is still not 
zero, but also probably not exacerbated compared to the current code. The 
plugin that depends on other plugins to be loaded while it's loading its 
classes is the connector type plugin only and there are no inter-connector 
dependencies (a connector requiring another connector's classes to be loaded 
while loading its own). With that in mind, a deadlock should be even less 
possible now. In the future we could consider introducing deadlock recovery 
methods to get out of this type of situation if necessary.

> ConfigDef.parseType deadlock
> 
>
> Key: KAFKA-12308
> URL: https://issues.apache.org/jira/browse/KAFKA-12308
> Project: Kafka
>  Issue Type: Bug
>  Components: config, KafkaConnect
>Affects Versions: 2.5.0
> Environment: kafka 2.5.0
> centos7
> java version "1.8.0_231"
>Reporter: cosmozhu
>Priority: Major
> Attachments: deadlock.log
>
>
> hi,
>  the problem was found, when I restarted *ConnectDistributed*
> I restart ConnectDistributed in the single node for the test, with not delete 
> connectors.
>  sometimes the process stopped when creating connectors.
> I add some logger and found it had a deadlock in `ConfigDef.parseType`.My 
> connectors always have the same transforms. I guess when connector startup 
> (in startAndStopExecutor which default 8 threads) and load the same class 
> file it has something wrong.
> I attached the jstack log file.
> thanks for any help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments

2021-07-14 Thread GitBox


cmccabe merged pull request #11048:
URL: https://github.com/apache/kafka/pull/11048


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-7421) Deadlock in Kafka Connect during class loading

2021-07-14 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-7421:
--
Summary: Deadlock in Kafka Connect during class loading  (was: Deadlock in 
Kafka Connect)

> Deadlock in Kafka Connect during class loading
> --
>
> Key: KAFKA-7421
> URL: https://issues.apache.org/jira/browse/KAFKA-7421
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Maciej Bryński
>Assignee: Konstantine Karantasis
>Priority: Major
>
> I'm getting this deadlock on half of Kafka Connect runs when having two 
> different types connectors (in this configuration it's debezium and hdfs).
> Thread 1:
> {code}
> "pool-22-thread-2@4748" prio=5 tid=0x4d nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>waiting for pool-22-thread-1@4747 to release lock on <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:367)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> Thread 2:
> {code}
> "pool-22-thread-1@4747" prio=5 tid=0x4c nid=NA waiting for monitor entry
>   java.lang.Thread.State: BLOCKED
>blocks pool-22-thread-2@4748
>waiting for pool-22-thread-2@4748 to release lock on <0x1421> (a 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:406)
> at 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:358)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked <0x1424> (a java.lang.Object)
> at 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> - locked <0x1423> (a 
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at 
> io.debezium.transforms.ByLogicalTableRouter.(ByLogicalTableRouter.java:57)
> at java.lang.Class.forName0(Class.java:-1)
> at java.lang.Class.forName(Class.java:348)
> at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:295)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:200)
> at 
> org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:194)
> at 
> org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:233)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:932)
> at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:928)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> 

[GitHub] [kafka] kkonstantine merged pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-07-14 Thread GitBox


kkonstantine merged pull request #8259:
URL: https://github.com/apache/kafka/pull/8259


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-07-14 Thread GitBox


kkonstantine commented on pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#issuecomment-880082421


   The test failures were not relevant. Merging to `trunk` and `3.0`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


jolshan commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669815073



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -391,10 +393,15 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 int newEpoch = partitionMetadata.leaderEpoch.get();
 // If the received leader epoch is at least the same as the 
previous one, update the metadata
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {
+if (currentEpoch == null || newEpoch >= currentEpoch) {
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +

Review comment:
   Yeah, I was thinking that too. I just have to be careful when comparing 
to remember the zero uuid case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669812349



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -377,6 +397,12 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+// If the topic ID changed, updated the metadata
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +
+"Removing last seen epoch {} for the old partition {} 
and adding epoch {} from new metadata", currentEpoch, tp, newEpoch);
+lastSeenLeaderEpochs.put(tp, newEpoch);
+return Optional.of(partitionMetadata);

Review comment:
   Yes, I was just pointing out that there is still a gap.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10952: KAFKA-12257: Consumer mishandles topics deleted and recreated with the same name

2021-07-14 Thread GitBox


hachikuji commented on a change in pull request #10952:
URL: https://github.com/apache/kafka/pull/10952#discussion_r669810537



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -391,10 +393,15 @@ else if (metadata.error() == 
Errors.TOPIC_AUTHORIZATION_FAILED)
 int newEpoch = partitionMetadata.leaderEpoch.get();
 // If the received leader epoch is at least the same as the 
previous one, update the metadata
 Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
-if (currentEpoch == null || newEpoch >= currentEpoch || 
changedTopicId) {
+if (currentEpoch == null || newEpoch >= currentEpoch) {
 log.debug("Updating last seen epoch for partition {} from {} 
to epoch {} from new metadata", tp, currentEpoch, newEpoch);
 lastSeenLeaderEpochs.put(tp, newEpoch);
 return Optional.of(partitionMetadata);
+} else if (changedTopicId) {
+log.debug("Topic ID changed, so this topic must have been 
recreated. " +

Review comment:
   If you pass the new one, then you can probably get rid of 
`changedTopicId`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-07-14 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r669806313



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -82,11 +93,19 @@
 Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> 
MANIFEST_PREFIX + serviceLoaderPlugin.getName())
 .collect(Collectors.toSet());
 
+// Although this classloader does not load classes directly but rather 
delegates loading to a
+// PluginClassLoader or its parent through its base class, because of the 
use of inheritance in
+// in the latter case, this classloader needs to also be declared as 
parallel capable to use
+// fine-grain locking when loading classes.
+static {

Review comment:
   Keeping the static block here, because it's a block and that's what we 
have in `PluginClassLoader`. Our style is not too strict with respect of this 
ordering. 
   
   The class overwrites the `loadClass` method. But it's delegating the loading 
to different classloaders and the locking is embedded in these classes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments

2021-07-14 Thread GitBox


cmccabe commented on pull request #11048:
URL: https://github.com/apache/kafka/pull/11048#issuecomment-880053828


   I updated this and fixed a case in createPartitions where the same issue 
existed (added a test for that as well)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments

2021-07-14 Thread GitBox


cmccabe commented on a change in pull request #11048:
URL: https://github.com/apache/kafka/pull/11048#discussion_r669790479



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -377,8 +377,20 @@ private ApiError createTopic(CreatableTopic topic,
 validateManualPartitionAssignment(assignment.brokerIds(), 
replicationFactor);
 replicationFactor = 
OptionalInt.of(assignment.brokerIds().size());
 int[] replicas = Replicas.toArray(assignment.brokerIds());
+List isr = new ArrayList<>();

Review comment:
   Yeah, we can use a stream.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #10280: KAFKA-12554: Refactor Log layer

2021-07-14 Thread GitBox


kowshik commented on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-880044495


   @ijuma Discussed with @satishd. We are not planning to include this in 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13088) Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest

2021-07-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13088:
--
Component/s: streams

> Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest
> 
>
> Key: KAFKA-13088
> URL: https://issues.apache.org/jira/browse/KAFKA-13088
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Chun-Hao Tang
>Assignee: Chun-Hao Tang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2021-07-14 Thread Ewen Cheslack-Postava (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380730#comment-17380730
 ] 

Ewen Cheslack-Postava commented on KAFKA-3539:
--

[~moses.nakamura] I spent a bunch of time on the clients in the past, but I've 
barely been involved for the past few years. So not really even sure of the 
current state of implementation and tests (e.g. I wasn't working on the clients 
when EoS was implemented).

What tests fail due to minor changes? If they are unit tests, that should be 
unexpected unless you are changing public API, which would require a KIP 
anyway. You might also just be seeing flakiness in integration tests, which 
unfortunately is expected. Providing a list of the tests that break and whether 
it's compilation or runtime issues would probably help, but someone more active 
can probably provide better guidance.

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>  Labels: needs-discussion, needs-kip
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments

2021-07-14 Thread GitBox


cmccabe commented on pull request #11048:
URL: https://github.com/apache/kafka/pull/11048#issuecomment-880035001


   > I'm assuming we don't need a similar check for the automatic assignment 
path since the replica placer will only select unfenced replicas.
   
   Right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 opened a new pull request #11051: KAFKA-13088: Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest

2021-07-14 Thread GitBox


tang7526 opened a new pull request #11051:
URL: https://github.com/apache/kafka/pull/11051


   https://issues.apache.org/jira/browse/KAFKA-13088
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13088) Replace EasyMock with Mockito for ForwardingDisabledProcessorContextTest

2021-07-14 Thread Chun-Hao Tang (Jira)
Chun-Hao Tang created KAFKA-13088:
-

 Summary: Replace EasyMock with Mockito for 
ForwardingDisabledProcessorContextTest
 Key: KAFKA-13088
 URL: https://issues.apache.org/jira/browse/KAFKA-13088
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chun-Hao Tang
Assignee: Chun-Hao Tang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10280: KAFKA-12554: Refactor Log layer

2021-07-14 Thread GitBox


ijuma commented on pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#issuecomment-879991378


   What is the reason for including a refactoring in 3.0 after the feature 
freeze?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11048: KAFKA-13083: In KRaft mode, fix issue with ISR in manual partition assignments

2021-07-14 Thread GitBox


mumrah commented on a change in pull request #11048:
URL: https://github.com/apache/kafka/pull/11048#discussion_r669693620



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -377,8 +377,20 @@ private ApiError createTopic(CreatableTopic topic,
 validateManualPartitionAssignment(assignment.brokerIds(), 
replicationFactor);
 replicationFactor = 
OptionalInt.of(assignment.brokerIds().size());
 int[] replicas = Replicas.toArray(assignment.brokerIds());
+List isr = new ArrayList<>();

Review comment:
   I think we can reduce the conversions between list and array here. We 
could also use a stream, e.g.,
   ```java
   assignment.brokerIds().stream().filter(clusterControl::unfenced)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13052) Replace uses of SerDes in the docs with Serdes

2021-07-14 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17380643#comment-17380643
 ] 

James Galasyn commented on KAFKA-13052:
---

[~lct45], fixed in https://github.com/apache/kafka/pull/11050.

> Replace uses of SerDes in the docs with Serdes
> --
>
> Key: KAFKA-13052
> URL: https://issues.apache.org/jira/browse/KAFKA-13052
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Leah Thomas
>Assignee: James Galasyn
>Priority: Minor
>  Labels: newbie
>
> Right now, we have scattered uses of `SerDes` throughout the docs. These 
> should be updated to be `Serdes`, as that's what we commonly use now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >