C0urante commented on code in PR #14102: URL: https://github.com/apache/kafka/pull/14102#discussion_r1290659476
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ########## @@ -3929,543 +3008,636 @@ public void testPollDurationOnSlowConnectorOperations() { final int rebalanceDelayMs = 20000; final long operationDelayMs = 10000; final long maxPollWaitMs = rebalanceDelayMs - operationDelayMs; - EasyMock.expect(member.memberId()).andStubReturn("member"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); + when(member.memberId()).thenReturn("member"); + when(member.currentProtocolVersion()).thenReturn(connectProtocolVersion); // Assign the connector to this worker, and have it start expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); expectConfigRefreshAndSnapshot(SNAPSHOT); - Capture<Callback<TargetState>> onFirstStart = newCapture(); - worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), - EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onFirstStart)); - PowerMock.expectLastCall().andAnswer(() -> { + ArgumentCaptor<Callback<TargetState>> onFirstStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { time.sleep(operationDelayMs); onFirstStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); - member.wakeup(); - PowerMock.expectLastCall(); - expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS); - // We should poll for less than the delay - time to start the connector, meaning that a long connector start - // does not delay the poll timeout - member.poll(leq(maxPollWaitMs)); - PowerMock.expectLastCall(); + }).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG), any(), eq(herder), eq(TargetState.STARTED), onFirstStart.capture()); + expectExecuteTaskReconfiguration(true, conn1SinkConfig, invocation -> TASK_CONFIGS); + + herder.tick(); // Rebalance again due to config update - member.wakeup(); - PowerMock.expectLastCall(); expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); - EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); - - worker.stopAndAwaitConnector(CONN1); - PowerMock.expectLastCall(); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion); - Capture<Callback<TargetState>> onSecondStart = newCapture(); - worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(), - EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onSecondStart)); - PowerMock.expectLastCall().andAnswer(() -> { + when(configBackingStore.snapshot()).thenReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); + doNothing().when(worker).stopAndAwaitConnector(CONN1); + + ArgumentCaptor<Callback<TargetState>> onSecondStart = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { time.sleep(operationDelayMs); onSecondStart.getValue().onCompletion(null, TargetState.STARTED); return true; - }); - member.wakeup(); - PowerMock.expectLastCall(); - member.poll(leq(maxPollWaitMs)); - PowerMock.expectLastCall(); + }).when(worker).startConnector(eq(CONN1), eq(CONN1_CONFIG_UPDATED), any(), eq(herder), eq(TargetState.STARTED), onSecondStart.capture()); + expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, invocation -> TASK_CONFIGS); + + configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config + herder.tick(); // Third tick should resolve all outstanding requests expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, singletonList(CONN1), Collections.emptyList(), rebalanceDelayMs); // which includes querying the connector task configs after the update - expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, () -> { + expectExecuteTaskReconfiguration(true, conn1SinkConfigUpdated, invocation -> { time.sleep(operationDelayMs); return TASK_CONFIGS; }); - member.poll(leq(maxPollWaitMs)); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - herder.tick(); - configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config - herder.tick(); herder.tick(); - PowerMock.verifyAll(); + + // We should poll for less than the delay - time to start the connector, meaning that a long connector start + // does not delay the poll timeout + verify(member, times(3)).poll(leq(maxPollWaitMs)); + verify(worker, times(2)).startConnector(eq(CONN1), any(), any(), eq(herder), eq(TargetState.STARTED), any()); + verifyNoMoreInteractions(member, worker, configBackingStore); } @Test public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionAndHerderNotStopping() { - EasyMock.expect(member.memberId()).andStubReturn("leader"); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); - - PowerMock.replayAll(); herder.startAndStopExecutor.shutdown(); assertThrows(RejectedExecutionException.class, herder::tick); + } + + @Test + public void testTaskReconfigurationRetriesWithConnectorTaskConfigsException() { + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); + expectConfigRefreshAndSnapshot(SNAPSHOT); + + when(worker.isRunning(CONN1)).thenReturn(true); + when(worker.getPlugins()).thenReturn(plugins); + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); + + when(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)) + .thenThrow(new ConnectException("Failed to generate task configs")) + .thenThrow(new ConnectException("Failed to generate task configs")) + .thenReturn(TASK_CONFIGS); + + expectAndVerifyTaskReconfigurationRetries(); + } + + @Test + public void testTaskReconfigurationRetriesWithLeaderRequestForwardingException() { + herder = mock(DistributedHerder.class, withSettings().defaultAnswer(CALLS_REAL_METHODS).useConstructor(new DistributedConfig(HERDER_CONFIG), + worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, + noneConnectorClientConfigOverridePolicy, Collections.emptyList(), new MockSynchronousExecutor(), new AutoCloseable[]{})); + + rebalanceListener = herder.new RebalanceListener(time); + + when(member.memberId()).thenReturn("member"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); + expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); + expectConfigRefreshAndSnapshot(SNAPSHOT); + + when(worker.isRunning(CONN1)).thenReturn(true); + when(worker.getPlugins()).thenReturn(plugins); + + SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); + + List<Map<String, String>> changedTaskConfigs = new ArrayList<>(TASK_CONFIGS); + changedTaskConfigs.add(TASK_CONFIG); + when(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)).thenReturn(changedTaskConfigs); + + when(restClient.httpRequest(any(), eq("POST"), any(), any(), any(), any(), any())) + .thenThrow(new ConnectException("Request to leader to reconfigure connector tasks failed")) + .thenThrow(new ConnectException("Request to leader to reconfigure connector tasks failed")) + .thenReturn(null); + + expectAndVerifyTaskReconfigurationRetries(); + } + + private void expectAndVerifyTaskReconfigurationRetries() { + // initial tick + herder.tick(); + herder.requestTaskReconfiguration(CONN1); + // process the task reconfiguration request in this tick + herder.tick(); + // advance the time by 250ms so that the task reconfiguration request with initial retry backoff is processed + time.sleep(250); + herder.tick(); + // advance the time by 500ms so that the task reconfiguration request with double the initial retry backoff is processed + time.sleep(500); + herder.tick(); + + // 1. end of initial tick when no request has been added to the herder queue yet + // 2. the third task reconfiguration request is expected to pass; so expect no more retries (a Long.MAX_VALUE poll + // timeout indicates that there is no herder request currently in the queue) + verify(member, times(2)).poll(eq(Long.MAX_VALUE)); + + // task reconfiguration herder request with initial retry backoff + verify(member).poll(eq(250L)); + + // task reconfiguration herder request with double the initial retry backoff + verify(member).poll(eq(500L)); + + verifyNoMoreInteractions(member, worker); + } + + @Test + public void processRestartRequestsFailureSuppression() { + doNothing().when(member).wakeup(); + + final String connectorName = "foo"; + RestartRequest restartRequest = new RestartRequest(connectorName, false, false); + doThrow(new RuntimeException()).when(herder).buildRestartPlan(restartRequest); + + configUpdateListener.onRestartRequest(restartRequest); + assertEquals(1, herder.pendingRestartRequests.size()); + herder.processRestartRequests(); + assertTrue(herder.pendingRestartRequests.isEmpty()); + + verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); + } + + @Test + public void processRestartRequestsDequeue() { + doNothing().when(member).wakeup(); + doReturn(Optional.empty()).when(herder).buildRestartPlan(any(RestartRequest.class)); + + RestartRequest restartRequest = new RestartRequest("foo", false, false); + configUpdateListener.onRestartRequest(restartRequest); + restartRequest = new RestartRequest("bar", false, false); + configUpdateListener.onRestartRequest(restartRequest); + assertEquals(2, herder.pendingRestartRequests.size()); + herder.processRestartRequests(); + assertTrue(herder.pendingRestartRequests.isEmpty()); + } + + @Test + public void preserveHighestImpactRestartRequest() { + doNothing().when(member).wakeup(); + + final String connectorName = "foo"; + RestartRequest restartRequest = new RestartRequest(connectorName, false, false); + configUpdateListener.onRestartRequest(restartRequest); + + // Will overwrite as this is higher impact + restartRequest = new RestartRequest(connectorName, false, true); + configUpdateListener.onRestartRequest(restartRequest); + assertEquals(1, herder.pendingRestartRequests.size()); + assertFalse(herder.pendingRestartRequests.get(connectorName).onlyFailed()); + assertTrue(herder.pendingRestartRequests.get(connectorName).includeTasks()); + + // Will be ignored as the existing request has higher impact + restartRequest = new RestartRequest(connectorName, true, false); + configUpdateListener.onRestartRequest(restartRequest); + assertEquals(1, herder.pendingRestartRequests.size()); + // Compare against existing request + assertFalse(herder.pendingRestartRequests.get(connectorName).onlyFailed()); + assertTrue(herder.pendingRestartRequests.get(connectorName).includeTasks()); + + verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); + } + + @Test + public void testExactlyOnceSourceSupportValidation() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = mock(SourceConnector.class); + when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(ExactlyOnceSupport.SUPPORTED); + + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertEquals(Collections.emptyList(), errors); + } + + @Test + public void testExactlyOnceSourceSupportValidationOnUnsupportedConnector() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = mock(SourceConnector.class); + when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(ExactlyOnceSupport.UNSUPPORTED); + + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertEquals( + Collections.singletonList("The connector does not support exactly-once semantics with the provided configuration."), + errors); + } + + @Test + public void testExactlyOnceSourceSupportValidationOnUnknownConnector() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = mock(SourceConnector.class); + when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(null); + + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); + + List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("The connector does not implement the API required for preflight validation of exactly-once source support.")); + assertEquals(1, errors.size()); + } + + @Test + public void testExactlyOnceSourceSupportValidationHandlesConnectorErrorsGracefully() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); + + SourceConnector connectorMock = mock(SourceConnector.class); + String errorMessage = "time to add a new unit test :)"; + when(connectorMock.exactlyOnceSupport(eq(config))).thenThrow(new NullPointerException(errorMessage)); + + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - PowerMock.verifyAll(); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains(errorMessage)); + assertEquals(1, errors.size()); } @Test - public void testTaskReconfigurationRetriesWithConnectorTaskConfigsException() { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); - expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); - expectConfigRefreshAndSnapshot(SNAPSHOT); + public void testExactlyOnceSourceSupportValidationWhenExactlyOnceNotEnabledOnWorker() { + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, REQUIRED.toString()); - // end of initial tick - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); + SourceConnector connectorMock = mock(SourceConnector.class); + when(connectorMock.exactlyOnceSupport(eq(config))).thenReturn(ExactlyOnceSupport.SUPPORTED); - member.wakeup(); - PowerMock.expectLastCall().anyTimes(); + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - member.ensureActive(); - PowerMock.expectLastCall().anyTimes(); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertEquals( + Collections.singletonList("This worker does not have exactly-once source support enabled."), + errors); + } - EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes(); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); + @Test + public void testExactlyOnceSourceSupportValidationHandlesInvalidValuesGracefully() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG, "invalid"); - SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)) - .andThrow(new ConnectException("Failed to generate task configs")).times(2); + SourceConnector connectorMock = mock(SourceConnector.class); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)).andReturn(TASK_CONFIGS); + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - expectAndVerifyTaskReconfigurationRetries(); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("String must be one of (case insensitive): ")); + assertEquals(1, errors.size()); } @Test - public void testTaskReconfigurationRetriesWithLeaderRequestForwardingException() { - herder = PowerMock.createPartialMock(DistributedHerder.class, - new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", "recordRestarting"}, - new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID, - statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy, - Collections.emptyList(), new MockSynchronousExecutor(), new AutoCloseable[]{}); + public void testConnectorTransactionBoundaryValidation() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); - rebalanceListener = herder.new RebalanceListener(time); + SourceConnector connectorMock = mock(SourceConnector.class); + when(connectorMock.canDefineTransactionBoundaries(eq(config))) + .thenReturn(ConnectorTransactionBoundaries.SUPPORTED); - EasyMock.expect(member.memberId()).andStubReturn("member"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); - expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); - expectConfigRefreshAndSnapshot(SNAPSHOT); + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - // end of initial tick - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertEquals(Collections.emptyList(), errors); + } - member.wakeup(); - PowerMock.expectLastCall().anyTimes(); + @Test + public void testConnectorTransactionBoundaryValidationOnUnsupportedConnector() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); - member.ensureActive(); - PowerMock.expectLastCall().anyTimes(); + SourceConnector connectorMock = mock(SourceConnector.class); + when(connectorMock.canDefineTransactionBoundaries(eq(config))) + .thenReturn(ConnectorTransactionBoundaries.UNSUPPORTED); - EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes(); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("The connector does not support connector-defined transaction boundaries with the given configuration.")); + assertEquals(1, errors.size()); + } - List<Map<String, String>> changedTaskConfigs = new ArrayList<>(TASK_CONFIGS); - changedTaskConfigs.add(TASK_CONFIG); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig)).andReturn(changedTaskConfigs).anyTimes(); + @Test + public void testConnectorTransactionBoundaryValidationHandlesConnectorErrorsGracefully() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString()); - EasyMock.expect(restClient.httpRequest( - EasyMock.anyString(), EasyMock.eq("POST"), EasyMock.anyObject(HttpHeaders.class), - EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(SecretKey.class), EasyMock.anyString()) - ).andThrow(new ConnectException("Request to leader to reconfigure connector tasks failed")).times(2); + SourceConnector connectorMock = mock(SourceConnector.class); + String errorMessage = "Wait I thought we tested for this?"; + when(connectorMock.canDefineTransactionBoundaries(eq(config))).thenThrow(new ConnectException(errorMessage)); - EasyMock.expect(restClient.httpRequest( - EasyMock.anyString(), EasyMock.eq("POST"), EasyMock.anyObject(HttpHeaders.class), - EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(SecretKey.class), EasyMock.anyString()) - ).andReturn(null); + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - expectAndVerifyTaskReconfigurationRetries(); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains(errorMessage)); + assertEquals(1, errors.size()); } - private void expectAndVerifyTaskReconfigurationRetries() { - // task reconfiguration herder request with initial retry backoff - member.poll(EasyMock.eq(250L)); - PowerMock.expectLastCall(); - - // task reconfiguration herder request with double the initial retry backoff - member.poll(EasyMock.eq(500L)); - PowerMock.expectLastCall(); - - // the third task reconfiguration request is expected to pass; so expect no more retries (a Long.MAX_VALUE poll - // timeout indicates that there is no herder request currently in the queue) - member.poll(EasyMock.eq(Long.MAX_VALUE)); - PowerMock.expectLastCall(); + @Test + public void testConnectorTransactionBoundaryValidationHandlesInvalidValuesGracefully() { + herder = exactlyOnceHerder(); + Map<String, String> config = new HashMap<>(); + config.put(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG, "CONNECTOR.toString()"); - PowerMock.replayAll(); + SourceConnector connectorMock = mock(SourceConnector.class); - // initial tick - herder.tick(); - herder.requestTaskReconfiguration(CONN1); - // process the task reconfiguration request in this tick - herder.tick(); - // advance the time by 250ms so that the task reconfiguration request with initial retry backoff is processed - time.sleep(250); - herder.tick(); - // advance the time by 500ms so that the task reconfiguration request with double the initial retry backoff is processed - time.sleep(500); - herder.tick(); + Map<String, ConfigValue> validatedConfigs = herder.validateSourceConnectorConfig( + connectorMock, SourceConnectorConfig.configDef(), config); - PowerMock.verifyAll(); + List<String> errors = validatedConfigs.get(SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG).errorMessages(); + assertFalse(errors.isEmpty()); + assertTrue( + "Error message did not contain expected text: " + errors.get(0), + errors.get(0).contains("String must be one of (case insensitive): ")); + assertEquals(1, errors.size()); } @Test public void testConnectorOffsets() throws Exception { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); - expectAnyTicks(); - member.wakeup(); - PowerMock.expectLastCall(); + when(statusBackingStore.connectors()).thenReturn(Collections.emptySet()); + doNothing().when(member).poll(anyLong()); - EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); + herder.tick(); + + when(configBackingStore.snapshot()).thenReturn(SNAPSHOT); ConnectorOffsets offsets = new ConnectorOffsets(Collections.singletonList(new ConnectorOffset( Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")))); - Capture<Callback<ConnectorOffsets>> callbackCapture = newCapture(); - worker.connectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), capture(callbackCapture)); - PowerMock.expectLastCall().andAnswer(() -> { + + ArgumentCaptor<Callback<ConnectorOffsets>> callbackCapture = ArgumentCaptor.forClass(Callback.class); + doAnswer(invocation -> { callbackCapture.getValue().onCompletion(null, offsets); return null; - }); + }).when(worker).connectorOffsets(eq(CONN1), eq(CONN1_CONFIG), callbackCapture.capture()); - PowerMock.replayAll(); - - herder.tick(); FutureCallback<ConnectorOffsets> cb = new FutureCallback<>(); herder.connectorOffsets(CONN1, cb); herder.tick(); assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS)); - PowerMock.verifyAll(); + verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore); } @Test public void testModifyConnectorOffsetsUnknownConnector() throws Exception { // Get the initial assignment - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // Now handle the connector offsets modification request - member.wakeup(); - PowerMock.expectLastCall(); - member.ensureActive(); - PowerMock.expectLastCall(); - expectConfigRefreshAndSnapshot(SNAPSHOT); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); herder.tick(); + + // Now handle the connector offsets modification request FutureCallback<Message> callback = new FutureCallback<>(); herder.modifyConnectorOffsets("connector-does-not-exist", new HashMap<>(), callback); herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof NotFoundException); - - PowerMock.verifyAll(); } @Test public void testModifyOffsetsConnectorNotInStoppedState() throws Exception { // Get the initial assignment - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // Now handle the connector offsets modification request - member.wakeup(); - PowerMock.expectLastCall(); - member.ensureActive(); - PowerMock.expectLastCall(); - expectConfigRefreshAndSnapshot(SNAPSHOT); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); herder.tick(); + + // Now handle the connector offsets modification request FutureCallback<Message> callback = new FutureCallback<>(); herder.modifyConnectorOffsets(CONN1, null, callback); herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof BadRequestException); - - PowerMock.verifyAll(); } @Test public void testModifyOffsetsNotLeader() throws Exception { // Get the initial assignment - EasyMock.expect(member.memberId()).andStubReturn("member"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("member"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), false); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // Now handle the connector offsets modification request - member.wakeup(); - PowerMock.expectLastCall(); - member.ensureActive(); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); herder.tick(); + + // Now handle the connector offsets modification request FutureCallback<Message> callback = new FutureCallback<>(); herder.modifyConnectorOffsets(CONN1, new HashMap<>(), callback); herder.tick(); ExecutionException e = assertThrows(ExecutionException.class, () -> callback.get(1000L, TimeUnit.MILLISECONDS)); assertTrue(e.getCause() instanceof NotLeaderException); - - PowerMock.verifyAll(); } @Test public void testModifyOffsetsSinkConnector() throws Exception { - EasyMock.reset(herder); - EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SINK).anyTimes(); - PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); - PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes(); - + when(herder.connectorType(any())).thenReturn(ConnectorType.SINK); // Get the initial assignment - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); + + herder.tick(); // Now handle the alter connector offsets request Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( Collections.singletonMap("partitionKey", "partitionValue"), Collections.singletonMap("offsetKey", "offsetValue")); - member.wakeup(); - PowerMock.expectLastCall(); - member.ensureActive(); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); - worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); + + ArgumentCaptor<Callback<Message>> workerCallbackCapture = ArgumentCaptor.forClass(Callback.class); Message msg = new Message("The offsets for this connector have been altered successfully"); - EasyMock.expectLastCall().andAnswer(() -> { + doAnswer(invocation -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; - }); + }).when(worker).modifyConnectorOffsets(eq(CONN1), eq(CONN1_CONFIG), eq(offsets), workerCallbackCapture.capture()); - PowerMock.replayAll(); - - herder.tick(); FutureCallback<Message> callback = new FutureCallback<>(); herder.alterConnectorOffsets(CONN1, offsets, callback); herder.tick(); assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); assertEquals("The offsets for this connector have been altered successfully", msg.message()); - - PowerMock.verifyAll(); } @Test public void testModifyOffsetsSourceConnectorExactlyOnceDisabled() throws Exception { // Get the initial assignment - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); + herder.tick(); // Now handle the reset connector offsets request - member.wakeup(); - PowerMock.expectLastCall(); - member.ensureActive(); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); - worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), isNull(), capture(workerCallbackCapture)); + ArgumentCaptor<Callback<Message>> workerCallbackCapture = ArgumentCaptor.forClass(Callback.class); Message msg = new Message("The offsets for this connector have been reset successfully"); - EasyMock.expectLastCall().andAnswer(() -> { + doAnswer(invocation -> { workerCallbackCapture.getValue().onCompletion(null, msg); return null; - }); + }).when(worker).modifyConnectorOffsets(eq(CONN1), eq(CONN1_CONFIG), isNull(), workerCallbackCapture.capture()); - PowerMock.replayAll(); - - herder.tick(); FutureCallback<Message> callback = new FutureCallback<>(); herder.resetConnectorOffsets(CONN1, callback); herder.tick(); assertEquals(msg, callback.get(1000L, TimeUnit.MILLISECONDS)); assertEquals("The offsets for this connector have been reset successfully", msg.message()); - - PowerMock.verifyAll(); } @Test public void testModifyOffsetsSourceConnectorExactlyOnceEnabled() throws Exception { // Setup herder with exactly-once support for source connectors enabled herder = exactlyOnceHerder(); rebalanceListener = herder.new RebalanceListener(time); - PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes(); - PowerMock.expectPrivate(herder, "updateDeletedTaskStatus").andVoid().anyTimes(); - // Get the initial assignment - EasyMock.expect(member.memberId()).andStubReturn("leader"); - EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0); + when(member.memberId()).thenReturn("leader"); + when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0); expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true); expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall().anyTimes(); + doNothing().when(member).poll(anyLong()); + + herder.tick(); // Now handle the alter connector offsets request - Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( - Collections.singletonMap("partitionKey", "partitionValue"), - Collections.singletonMap("offsetKey", "offsetValue")); - member.wakeup(); - PowerMock.expectLastCall().anyTimes(); - member.ensureActive(); - PowerMock.expectLastCall().anyTimes(); - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - EasyMock.expect(herder.connectorType(EasyMock.anyObject())).andReturn(ConnectorType.SOURCE).anyTimes(); + doNothing().when(member).ensureActive(); + when(herder.connectorType(any())).thenReturn(ConnectorType.SOURCE); // Expect a round of zombie fencing to occur - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1); - KafkaFuture<Void> workerFencingFuture = EasyMock.mock(KafkaFuture.class); - KafkaFuture<Void> herderFencingFuture = EasyMock.mock(KafkaFuture.class); - EasyMock.expect(worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), CONN1_CONFIG)).andReturn(workerFencingFuture); - EasyMock.expect(workerFencingFuture.thenApply(EasyMock.<KafkaFuture.BaseFunction<Void, Void>>anyObject())).andReturn(herderFencingFuture); - - // Two fencing callbacks are added - one is in ZombieFencing::start itself to remove the connector from the active - // fencing list. The other is the callback passed from DistributedHerder::modifyConnectorOffsets in order to - // queue up the actual alter offsets request if the zombie fencing succeeds. - for (int i = 0; i < 2; i++) { - Capture<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = EasyMock.newCapture(); - EasyMock.expect(herderFencingFuture.whenComplete(EasyMock.capture(herderFencingCallback))).andAnswer(() -> { - herderFencingCallback.getValue().accept(null, null); - return null; - }); - } - - Capture<Callback<Message>> workerCallbackCapture = Capture.newInstance(); - Message msg = new Message("The offsets for this connector have been altered successfully"); - worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture)); - EasyMock.expectLastCall().andAnswer(() -> { - workerCallbackCapture.getValue().onCompletion(null, msg); + KafkaFuture<Void> workerFencingFuture = mock(KafkaFuture.class); + KafkaFuture<Void> herderFencingFuture = mock(KafkaFuture.class); + when(worker.fenceZombies(CONN1, SNAPSHOT_STOPPED_CONN1.taskCountRecord(CONN1), CONN1_CONFIG)).thenReturn(workerFencingFuture); + when(workerFencingFuture.thenApply(any(KafkaFuture.BaseFunction.class))).thenReturn(herderFencingFuture); + + ArgumentCaptor<KafkaFuture.BiConsumer<Void, Throwable>> herderFencingCallback = ArgumentCaptor.forClass(KafkaFuture.BiConsumer.class); + when(herderFencingFuture.whenComplete(herderFencingCallback.capture())).thenAnswer(invocation -> { + herderFencingCallback.getValue().accept(null, null); return null; }); - // Handle the second alter connector offsets request. No zombie fencing request to the worker is expected now since we - // already did a round of zombie fencing last time and no new tasks came up in the meanwhile. The config snapshot is - // refreshed once at the beginning of the DistributedHerder::modifyConnectorOffsets method, once before checking - // whether zombie fencing is required, and once before actually proceeding to alter connector offsets. - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); - expectConfigRefreshAndSnapshot(SNAPSHOT_STOPPED_CONN1_FENCED); - Capture<Callback<Message>> workerCallbackCapture2 = Capture.newInstance(); - worker.modifyConnectorOffsets(EasyMock.eq(CONN1), EasyMock.eq(CONN1_CONFIG), EasyMock.eq(offsets), capture(workerCallbackCapture2)); - EasyMock.expectLastCall().andAnswer(() -> { - workerCallbackCapture2.getValue().onCompletion(null, msg); - return null; - }); + ArgumentCaptor<Callback<Message>> workerCallbackCapture = ArgumentCaptor.forClass(Callback.class); + Message msg = new Message("The offsets for this connector have been altered successfully"); - PowerMock.replayAll(workerFencingFuture, herderFencingFuture); + Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap( + Collections.singletonMap("partitionKey", "partitionValue"), + Collections.singletonMap("offsetKey", "offsetValue")); + doAnswer(invocation -> { + workerCallbackCapture.getValue().onCompletion(null, msg); + return null; + }).when(worker).modifyConnectorOffsets(eq(CONN1), eq(CONN1_CONFIG), eq(offsets), workerCallbackCapture.capture()); - herder.tick(); FutureCallback<Message> callback = new FutureCallback<>(); herder.alterConnectorOffsets(CONN1, offsets, callback); - // Process the zombie fencing request - herder.tick(); - // Process the alter offsets request + // Process the zombie fencing request that is queued up first followed by the actual alter offsets request Review Comment: Great, thanks for humoring me! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org