chia7712 commented on code in PR #15841:
URL: https://github.com/apache/kafka/pull/15841#discussion_r1586954293
##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState()
throws Exception {
verify(configLog).stop();
}
+@Test
+public void testPutConnectorConfigProducerError() throws Exception {
+expectStart(Collections.emptyList(), Collections.emptyMap());
+expectPartitionCount(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0)))
+.thenReturn(CONFIGS_SERIALIZED.get(0));
+when(configLog.sendWithReceipt(anyString(),
any(byte[].class))).thenReturn(producerFuture);
+
+// Verify initial state
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+assertEquals(0, configState.connectors().size());
+
+when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(
+new ExecutionException(new
TopicAuthorizationException(Collections.singleton("test";
+
+// verify that the producer exception from KafkaBasedLog::send is
propagated
+ConnectException e = assertThrows(ConnectException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+SAMPLE_CONFIGS.get(0), null));
+assertTrue(e.getMessage().contains("Error writing connector
configuration to Kafka"));
Review Comment:
Could we verify the `e.getCause()` to make sure the error is caused by what
we expect?
##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -347,6 +380,165 @@ public void testPutConnectorConfigWithTargetState()
throws Exception {
verify(configLog).stop();
}
+@Test
+public void testPutConnectorConfigProducerError() throws Exception {
+expectStart(Collections.emptyList(), Collections.emptyMap());
+expectPartitionCount(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+when(converter.fromConnectData(TOPIC,
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0,
CONNECTOR_CONFIG_STRUCTS.get(0)))
+.thenReturn(CONFIGS_SERIALIZED.get(0));
+when(configLog.sendWithReceipt(anyString(),
any(byte[].class))).thenReturn(producerFuture);
+
+// Verify initial state
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+assertEquals(0, configState.connectors().size());
+
+when(producerFuture.get(anyLong(), any(TimeUnit.class))).thenThrow(
+new ExecutionException(new
TopicAuthorizationException(Collections.singleton("test";
+
+// verify that the producer exception from KafkaBasedLog::send is
propagated
+ConnectException e = assertThrows(ConnectException.class, () ->
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
+SAMPLE_CONFIGS.get(0), null));
+assertTrue(e.getMessage().contains("Error writing connector
configuration to Kafka"));
+
+configStorage.stop();
+verify(configLog).stop();
+}
+
+@Test
+public void testRemoveConnectorConfigSlowProducer() throws Exception {
+expectStart(Collections.emptyList(), Collections.emptyMap());
+expectPartitionCount(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+@SuppressWarnings("unchecked")
+Future connectorConfigProducerFuture =
mock(Future.class);
+
+@SuppressWarnings("unchecked")
+Future targetStateProducerFuture = mock(Future.class);
+
+when(configLog.sendWithReceipt(anyString(), isNull()))
+// tombstone for the connector config
+.thenReturn(connectorConfigProducerFuture)
+// tombstone for the connector target state
+.thenReturn(targetStateProducerFuture);
+
+
when(connectorConfigProducerFuture.get(eq(READ_WRITE_TOTAL_TIMEOUT_MS),
any(TimeUnit.class)))
+.thenAnswer((Answer) invocation -> {
+time.sleep(READ_WRITE_TOTAL_TIMEOUT_MS - 1000);
+return null;
+});
+
+// the future get timeout is expected to be reduced according to how
long the previous Future::get took
+when(targetStateProducerFuture.get(eq(1000L), any(TimeUnit.class)))
+.thenAnswer((Answer) invocation -> {
+time.sleep(1000);
+