[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r867521081 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java: ## @@ -148,18 +148,18 @@ public class IncrementalCooperativeConnectProtocol { * Current Assignment => [Byte] * */ -public static ByteBuffer serializeMetadata(ExtendedWorkerState workerState, boolean sessioned) { +public static ByteBuffer serializeMetadata(ExtendedWorkerState workerState) { Review Comment: No worries, done! Sorry for the delay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863740301 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java: ## @@ -148,18 +148,18 @@ public class IncrementalCooperativeConnectProtocol { * Current Assignment => [Byte] * */ -public static ByteBuffer serializeMetadata(ExtendedWorkerState workerState, boolean sessioned) { +public static ByteBuffer serializeMetadata(ExtendedWorkerState workerState) { Review Comment: Ah, good catch! I've pushed a fix but tried to keep the improvement to the serialization API; if you'd prefer to go back to the `sessioned` boolean LMK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863330742 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java: ## @@ -51,95 +41,58 @@ public class ConnectProtocolCompatibilityTest { private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0); private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0); -@Rule -public MockitoRule rule = MockitoJUnit.rule(); - -@Mock -private KafkaConfigBackingStore configStorage; -private ClusterConfigState configState; - -@Before -public void setup() { -configStorage = mock(KafkaConfigBackingStore.class); -configState = new ClusterConfigState( -1L, -null, -Collections.singletonMap(connectorId1, 1), -Collections.singletonMap(connectorId1, new HashMap<>()), -Collections.singletonMap(connectorId1, TargetState.STARTED), -Collections.singletonMap(taskId1x0, new HashMap<>()), -Collections.emptySet()); -} - -@After -public void teardown() { -verifyNoMoreInteractions(configStorage); -} Review Comment: All of this is completely unnecessary and can be removed without diminishing testing coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: I don't think so; it looks like we compute load-balancing revocations later on, around line 300. At line 279, the `completeWorkerAssignment` that we derive `connectorAssignments` and `taskAssignments` from only has the `deleted` connectors and tasks removed from it; everything else is still included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863330276 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: I don't think so; it looks like we compute load-balancing revocations later on, around line 300. At line 279, the `completeWorkerAssignment` that we derive `connectorAssignments` and `taskAssignments` from only has the `deleted` connectors and tasks removed from it; everything else (including load-balancing revocations and duplicated assignments) is still included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863329552 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java: ## @@ -230,15 +230,16 @@ public static ExtendedWorkerState deserializeMetadata(ByteBuffer buffer) { * ScheduledDelay => Int32 * */ -public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) { +public static ByteBuffer serializeAssignment(ExtendedAssignment assignment, boolean sessioned) { // comparison depends on reference equality for now if (assignment == null || ExtendedAssignment.empty().equals(assignment)) { return null; } Struct struct = assignment.toStruct(); -ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf() +Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1; +ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf() + ASSIGNMENT_V1.sizeOf(struct)); -CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer); +protocolHeader.writeTo(buffer); Review Comment: Ah yeah, good call! Much cleaner than what we had before. It was a little more involved than I initially thought to make this change but IMO the end result is cleaner and easier to read, so hopefully it's worth the inflation in the diff here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863308185 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -108,18 +107,15 @@ public Map performAssignment(String leaderId, String protoco log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, coordinator.configSnapshot().offset()); -short protocolVersion = memberConfigs.values().stream() -.allMatch(state -> state.assignment().version() == CONNECT_PROTOCOL_V2) -? CONNECT_PROTOCOL_V2 -: CONNECT_PROTOCOL_V1; +short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion(); Review Comment: Yep, exactly Should've known that when I implemented KIP-507 originally but was still getting my bearings with the group coordinator logic. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -108,18 +107,15 @@ public Map performAssignment(String leaderId, String protoco log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, coordinator.configSnapshot().offset()); -short protocolVersion = memberConfigs.values().stream() -.allMatch(state -> state.assignment().version() == CONNECT_PROTOCOL_V2) -? CONNECT_PROTOCOL_V2 -: CONNECT_PROTOCOL_V1; +short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion(); Review Comment: Yep, exactly Should've known that when I implemented KIP-507 originally but was still getting my bearings with the group coordinator logic. Better late than never! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r863308048 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment( log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments); log.debug("Incremental task assignments: {}", incrementalTaskAssignments); +Map> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors); +Map> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks); + return new ClusterAssignment( incrementalConnectorAssignments, incrementalTaskAssignments, -transformValues(toRevoke, ConnectorsAndTasks::connectors), -transformValues(toRevoke, ConnectorsAndTasks::tasks), -connectorAssignments, -taskAssignments +revokedConnectors, +revokedTasks, +diff(connectorAssignments, revokedConnectors), +diff(taskAssignments, revokedTasks) Review Comment: Yep! There was a bug here. FWIW the same issue is already fixed by https://github.com/apache/kafka/pull/12019. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r861366880 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -44,93 +35,49 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; -import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; +import static org.apache.kafka.connect.util.ConnectUtils.transformValues; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.runners.Parameterized.Parameter; -import static org.junit.runners.Parameterized.Parameters; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -@RunWith(Parameterized.class) -public class IncrementalCooperativeAssignorTest { -@Rule -public MockitoRule rule = MockitoJUnit.rule(); - -@Mock -private WorkerCoordinator coordinator; - -@Captor -ArgumentCaptor> assignmentsCapture; - -@Parameters -public static Iterable mode() { -return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2); -} Review Comment: Turns out the logic there was both unnecessary (it's guaranteed that all metadata will have the same protocol) and incorrect (the serialization logic never propagated the correct protocol version). I've pushed a patch and a couple of tests to fix this; LMKWYT. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() { }); } -private void verifyCoordinatorInteractions() { -verify(coordinator, times(rebalanceNum)).configSnapshot(); -verify(coordinator, times(rebalanceNum)).leaderState(any()); -verify(coordinator, times(2 * rebalanceNum)).generationId(); -verify(coordinator, times(rebalanceNum)).memberId(); -verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); -} Review Comment: Ack, done. The new test does nothing to verify the content of the leader state, but it does verify that `WorkerCoordinator::leaderState` is invoked, which is the same coverage that the current tests provide. I've also augmented the existing test logic to check the content of the `ClusterAssignment` (specifically, the return value of the `allAssignedConnectors` and `allAssignedTasks` methods for each worker in the cluster) in `applyAssignments`. As long as we use the `ClusterAssignment` object correctly (and it's fairly trivial to verify that we do) by passing its state to the coordinator, then this should ensure that the proper leader state is recorded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r861366648 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -1142,107 +982,65 @@ private void removeConnector(String connector) { ); } -private void updateConfigSnapshot() { -when(coordinator.configSnapshot()).thenReturn(configState()); -} - private ClusterConfigState configState() { Map taskCounts = new HashMap<>(connectors); -Map> connectorConfigs = taskCounts.keySet().stream().collect(Collectors.toMap( -Function.identity(), -connector -> Collections.emptyMap() -)); -Map targetStates = taskCounts.keySet().stream().collect(Collectors.toMap( -Function.identity(), -connector -> TargetState.STARTED -)); +Map> connectorConfigs = transformValues(taskCounts, c -> Collections.emptyMap()); +Map targetStates = transformValues(taskCounts, c -> TargetState.STARTED); Map> taskConfigs = taskCounts.entrySet().stream() .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> new ConnectorTaskId(e.getKey(), i))) .collect(Collectors.toMap( Function.identity(), connectorTaskId -> Collections.emptyMap() )); return new ClusterConfigState( -offset, +16, Review Comment: sounds good, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11983: KAFKA-13763 (2): Refactor IncrementalCooperativeAssignor for improved unit testing
C0urante commented on code in PR #11983: URL: https://github.com/apache/kafka/pull/11983#discussion_r857610529 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -1375,24 +1171,6 @@ private void assertCompleteAllocation() { }); } -private void verifyCoordinatorInteractions() { -verify(coordinator, times(rebalanceNum)).configSnapshot(); -verify(coordinator, times(rebalanceNum)).leaderState(any()); -verify(coordinator, times(2 * rebalanceNum)).generationId(); -verify(coordinator, times(rebalanceNum)).memberId(); -verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId(); -} Review Comment: Not at the moment, no. The existing tests don't give us that much anyways: - It doesn't matter at all how many times we request the current config snapshot, generation ID, member ID, or last completed generation ID; these methods all have trivial implementations - Although it does matter that we invoke `Coordinator::leaderState` during the rebalance, the existing tests only verify that we invoke it an expected number of times, but without any assertions about what the actual state given to the coordinator is ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java: ## @@ -44,93 +35,49 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; -import java.util.stream.Stream; -import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1; -import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; +import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; +import static org.apache.kafka.connect.util.ConnectUtils.transformValues; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.runners.Parameterized.Parameter; -import static org.junit.runners.Parameterized.Parameters; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -@RunWith(Parameterized.class) -public class IncrementalCooperativeAssignorTest { -@Rule -public MockitoRule rule = MockitoJUnit.rule(); - -@Mock -private WorkerCoordinator coordinator; - -@Captor -ArgumentCaptor> assignmentsCapture; - -@Parameters -public static Iterable mode() { -return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2); -} Review Comment: Yeah, the existing tests provide basically no guarantees for these cases, and the actual allocation logic (which is 95% of what should be tested for this class) doesn't vary at all based on the protocol version, which can be verified by the lack of a `protocolVersion` argument in the visible-for-testing `performTaskAssignment` method. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ## @@ -745,22 +779,108 @@ protected void assignTasks(List workerAssignment, Collection workerAssignment(Map memberConfigs, +private static List workerAssignment(Map memberAssignments, ConnectorsAndTasks toExclude) { ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder() .with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks())) .build(); -return memberConfigs.entrySet().stream() +return memberAssignments.entrySet().stream() .map(e -> new WorkerLoad.Builder(e.getKey()).with( -e.getValue().assignment().connectors().stream() +e.getValue().connectors().stream() .filter(v -> !ignore.connectors().contains(v)) .collect(Collectors.toList()), -e.getValue().assignment().tasks().stream() +e.getValue().tasks().stream() .filter(v -> !ignore.tasks().contains(v))