C0urante commented on code in PR #14966: URL: https://github.com/apache/kafka/pull/14966#discussion_r1423199462
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; - private static final String CONNECTOR_NAME = "test-connector"; - private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; - private Map<String, String> workerProps; - private EmbeddedConnectCluster.Builder connectBuilder; + private static final Map<Map<String, String>, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); + @Rule + public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; + private String connectorName; + private String topic; @Before public void setup() { - Properties brokerProps = new Properties(); - brokerProps.put("transaction.state.log.replication.factor", "1"); - brokerProps.put("transaction.state.log.min.isr", "1"); - - // setup Connect worker properties - workerProps = new HashMap<>(); - workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - - // build a Connect cluster backed by Kafka and Zk - connectBuilder = new EmbeddedConnectCluster.Builder() - .name("connect-cluster") - .numWorkers(NUM_WORKERS) - .brokerProps(brokerProps) - .workerProps(workerProps); + connectorName = currentTest.getMethodName(); + topic = currentTest.getMethodName(); + connect = defaultConnectCluster(); } @After public void tearDown() { - connect.stop(); + Set<String> remainingConnectors = new HashSet<>(connect.connectors()); + if (remainingConnectors.remove(connectorName)) { + connect.deleteConnector(connectorName); + } + try { + assertEquals( + "Some connectors were not properly cleaned up after this test", + Collections.emptySet(), + remainingConnectors + ); + } finally { + // Make a last-ditch effort to clean up the leaked connectors + // so as not to interfere with other test cases + remainingConnectors.forEach(connect::deleteConnector); + } + } + + @AfterClass + public static void close() { + // stop all Connect, Kafka and Zk threads. + CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); + } + + private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) { + return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + + // Have to declare a new map since the passed-in one may be immutable + Map<String, String> workerPropsWithDefaults = new HashMap<>(workerProps); + // Enable fast offset commits by default + workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); + + EmbeddedConnectCluster result = new EmbeddedConnectCluster.Builder() + .name("connect-cluster") + .numWorkers(NUM_WORKERS) + .brokerProps(brokerProps) + .workerProps(workerPropsWithDefaults) + .build(); + + result.start(); + + return result; + }); + } + + private static EmbeddedConnectCluster defaultConnectCluster() { + return createOrReuseConnectWithWorkerProps(Collections.emptyMap()); + } + + private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() { + Map<String, String> workerProps = Collections.singletonMap( + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, + "enabled" + ); + return createOrReuseConnectWithWorkerProps(workerProps); } @Test public void testGetNonExistentConnectorOffsets() { - connect = connectBuilder.build(); - connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.connectorOffsets("non-existent-connector")); assertEquals(404, e.errorCode()); } @Test public void testGetSinkConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); - connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, - "overridden-group-id"); + String overriddenGroupId = connectorName = "-overridden-group-id"; Review Comment: There's no need to use the same string in different test cases, so there's less benefit than usual for extracting this out to a class constant. We could just as easily change it to `"custom-group-id"`, `"other-group-id"`, etc. in a single test and things would work just as well. It also adds noise to the entire test suite to add class-level constants; since overridden group IDs are only used in three of the twenty-seven tests here, IMO it's cleaner to keep these strings isolated to the tests that actually need them. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ########## @@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest { private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1); private static final long OFFSET_READ_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; - private static final String CONNECTOR_NAME = "test-connector"; - private static final String TOPIC = "test-topic"; private static final int NUM_TASKS = 2; private static final int NUM_RECORDS_PER_PARTITION = 10; - private Map<String, String> workerProps; - private EmbeddedConnectCluster.Builder connectBuilder; + private static final Map<Map<String, String>, EmbeddedConnectCluster> CONNECT_CLUSTERS = new ConcurrentHashMap<>(); + @Rule + public TestName currentTest = new TestName(); private EmbeddedConnectCluster connect; + private String connectorName; + private String topic; @Before public void setup() { - Properties brokerProps = new Properties(); - brokerProps.put("transaction.state.log.replication.factor", "1"); - brokerProps.put("transaction.state.log.min.isr", "1"); - - // setup Connect worker properties - workerProps = new HashMap<>(); - workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); - - // build a Connect cluster backed by Kafka and Zk - connectBuilder = new EmbeddedConnectCluster.Builder() - .name("connect-cluster") - .numWorkers(NUM_WORKERS) - .brokerProps(brokerProps) - .workerProps(workerProps); + connectorName = currentTest.getMethodName(); + topic = currentTest.getMethodName(); + connect = defaultConnectCluster(); } @After public void tearDown() { - connect.stop(); + Set<String> remainingConnectors = new HashSet<>(connect.connectors()); + if (remainingConnectors.remove(connectorName)) { + connect.deleteConnector(connectorName); + } + try { + assertEquals( + "Some connectors were not properly cleaned up after this test", + Collections.emptySet(), + remainingConnectors + ); + } finally { + // Make a last-ditch effort to clean up the leaked connectors + // so as not to interfere with other test cases + remainingConnectors.forEach(connect::deleteConnector); + } + } + + @AfterClass + public static void close() { + // stop all Connect, Kafka and Zk threads. + CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); + } + + private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) { + return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> { + Properties brokerProps = new Properties(); + brokerProps.put("transaction.state.log.replication.factor", "1"); + brokerProps.put("transaction.state.log.min.isr", "1"); + + // Have to declare a new map since the passed-in one may be immutable + Map<String, String> workerPropsWithDefaults = new HashMap<>(workerProps); + // Enable fast offset commits by default + workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); + + EmbeddedConnectCluster result = new EmbeddedConnectCluster.Builder() + .name("connect-cluster") + .numWorkers(NUM_WORKERS) + .brokerProps(brokerProps) + .workerProps(workerPropsWithDefaults) + .build(); + + result.start(); + + return result; + }); + } + + private static EmbeddedConnectCluster defaultConnectCluster() { + return createOrReuseConnectWithWorkerProps(Collections.emptyMap()); + } + + private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() { + Map<String, String> workerProps = Collections.singletonMap( + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, + "enabled" + ); + return createOrReuseConnectWithWorkerProps(workerProps); } @Test public void testGetNonExistentConnectorOffsets() { - connect = connectBuilder.build(); - connect.start(); ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.connectorOffsets("non-existent-connector")); assertEquals(404, e.errorCode()); } @Test public void testGetSinkConnectorOffsets() throws Exception { - connect = connectBuilder.build(); - connect.start(); getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), connect.kafka()); } @Test public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws Exception { - connect = connectBuilder.build(); - connect.start(); Map<String, String> connectorConfigs = baseSinkConnectorConfigs(); - connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.GROUP_ID_CONFIG, - "overridden-group-id"); + String overriddenGroupId = connectorName = "-overridden-group-id"; Review Comment: There's no need to use the same string in different test cases, so there's less benefit than usual for extracting this out to a class constant. We could change it to `"custom-group-id"`, `"other-group-id"`, etc. in a single test and things would work just as well. It also adds noise to the entire test suite to add class-level constants; since overridden group IDs are only used in three of the twenty-seven tests here, IMO it's cleaner to keep these strings isolated to the tests that actually need them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org