C0urante commented on code in PR #12295:
URL: https://github.com/apache/kafka/pull/12295#discussion_r903123084
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -1904,6 +1906,78 @@ public void testConnectorConfigUpdate() throws Exception
{
PowerMock.verifyAll();
}
+ @Test
+ public void testConnectorConfigUpdateFailedTransformation() throws
Exception {
+ // Connector config can be applied without any rebalance
+
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+ WorkerConfigTransformer configTransformer =
EasyMock.mock(WorkerConfigTransformer.class);
+ ClusterConfigState snapshotWithTransform = new ClusterConfigState(1,
null, Collections.singletonMap(CONN1, 1),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(),
Collections.emptySet(), Collections.emptySet(), configTransformer);
+
+ // join
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
+ EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1),
EasyMock.anyObject()))
+ .andReturn(CONN1_CONFIG).times(2); // once for the connector, once
for the single task
+ expectConfigRefreshAndSnapshot(snapshotWithTransform);
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+ Capture<Callback<TargetState>> onStart = newCapture();
+ worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(),
EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED),
capture(onStart));
+ PowerMock.expectLastCall().andAnswer(() -> {
+ onStart.getValue().onCompletion(null, TargetState.STARTED);
+ return true;
+ });
+ member.wakeup();
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1,
conn1SinkConfig)).andReturn(TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // apply config
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform);
// for this test, it doesn't matter if we use the same config snapshot
+ EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1),
EasyMock.anyObject()))
+ .andThrow(new AssertionError("Config transformation should not
crash herder"));
Review Comment:
Nit: I get that if this exception bubbles up to the surface of a unit test,
it does count as a failed assertion of sorts. But I usually associate an
`AssertionError` with a failed attempt to actively assert some state that we've
calculated during the test, like comparing an expected exception message to an
actual one, or counting the number of elements in a list. It's also a little
unrealistic for a config provider to throw this kind of exception.
I'd personally opt to replace this with a `ConnectException` or a
`ConfigException` with a self-describing body like "simulated exception thrown
from config provider", which might help illustrate the purpose of the test
better to fresh eyes.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -1904,6 +1906,78 @@ public void testConnectorConfigUpdate() throws Exception
{
PowerMock.verifyAll();
}
+ @Test
+ public void testConnectorConfigUpdateFailedTransformation() throws
Exception {
+ // Connector config can be applied without any rebalance
+
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+ WorkerConfigTransformer configTransformer =
EasyMock.mock(WorkerConfigTransformer.class);
+ ClusterConfigState snapshotWithTransform = new ClusterConfigState(1,
null, Collections.singletonMap(CONN1, 1),
+ Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(),
Collections.emptySet(), Collections.emptySet(), configTransformer);
+
+ // join
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList());
+ EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1),
EasyMock.anyObject()))
+ .andReturn(CONN1_CONFIG).times(2); // once for the connector, once
for the single task
+ expectConfigRefreshAndSnapshot(snapshotWithTransform);
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+ Capture<Callback<TargetState>> onStart = newCapture();
+ worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(),
EasyMock.anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED),
capture(onStart));
+ PowerMock.expectLastCall().andAnswer(() -> {
+ onStart.getValue().onCompletion(null, TargetState.STARTED);
+ return true;
+ });
+ member.wakeup();
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1,
conn1SinkConfig)).andReturn(TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ // apply config
+ member.wakeup();
+ PowerMock.expectLastCall();
+ member.ensureActive();
+ PowerMock.expectLastCall();
+
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform);
// for this test, it doesn't matter if we use the same config snapshot
+ EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1),
EasyMock.anyObject()))
+ .andThrow(new AssertionError("Config transformation should not
crash herder"));
+ worker.stopAndAwaitConnector(CONN1);
+ PowerMock.expectLastCall();
+ Capture<ConnectorStatus> failedStatus = newCapture();
+ statusBackingStore.putSafe(capture(failedStatus));
+ PowerMock.expectLastCall().andAnswer(() -> {
+ assertEquals(CONN1, failedStatus.getValue().id());
+ assertEquals(FAILED, failedStatus.getValue().state());
Review Comment:
Nit: it's a little strange to do these assertions in an answer to the method
invocation. If the herder has logic to handle exceptions thrown by
`statusBackingStore::putSafe`, or if the call to that method takes place on a
different thread, it might get accidentally swallowed and missed by the test.
It might be better to move this logic into the top-level body of the unit
test method, probably near the call to `PowerMock::verifyAll`. It'd be less
susceptible to false negatives, and IMO would be a little easier to follow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]