yashmayya commented on code in PR #14966:
URL: https://github.com/apache/kafka/pull/14966#discussion_r1424910127


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -70,74 +80,115 @@ public class OffsetsApiIntegrationTest {
     private static final long OFFSET_COMMIT_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(1);
     private static final long OFFSET_READ_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
-    private static final String CONNECTOR_NAME = "test-connector";
-    private static final String TOPIC = "test-topic";
     private static final int NUM_TASKS = 2;
     private static final int NUM_RECORDS_PER_PARTITION = 10;
-    private Map<String, String> workerProps;
-    private EmbeddedConnectCluster.Builder connectBuilder;
+    private static final Map<Map<String, String>, EmbeddedConnectCluster> 
CONNECT_CLUSTERS = new ConcurrentHashMap<>();
+    @Rule
+    public TestName currentTest = new TestName();
     private EmbeddedConnectCluster connect;
+    private String connectorName;
+    private String topic;
 
     @Before
     public void setup() {
-        Properties brokerProps = new Properties();
-        brokerProps.put("transaction.state.log.replication.factor", "1");
-        brokerProps.put("transaction.state.log.min.isr", "1");
-
-        // setup Connect worker properties
-        workerProps = new HashMap<>();
-        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
-
-        // build a Connect cluster backed by Kafka and Zk
-        connectBuilder = new EmbeddedConnectCluster.Builder()
-                .name("connect-cluster")
-                .numWorkers(NUM_WORKERS)
-                .brokerProps(brokerProps)
-                .workerProps(workerProps);
+        connectorName = currentTest.getMethodName();
+        topic = currentTest.getMethodName();
+        connect = defaultConnectCluster();
     }
 
     @After
     public void tearDown() {
-        connect.stop();
+        Set<String> remainingConnectors = new HashSet<>(connect.connectors());
+        if (remainingConnectors.remove(connectorName)) {
+            connect.deleteConnector(connectorName);
+        }
+        try {
+            assertEquals(
+                    "Some connectors were not properly cleaned up after this 
test",
+                    Collections.emptySet(),
+                    remainingConnectors
+            );
+        } finally {
+            // Make a last-ditch effort to clean up the leaked connectors
+            // so as not to interfere with other test cases
+            remainingConnectors.forEach(connect::deleteConnector);
+        }
+    }
+
+    @AfterClass
+    public static void close() {
+        // stop all Connect, Kafka and Zk threads.
+        CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
+    }
+
+    private static EmbeddedConnectCluster 
createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) {
+        return CONNECT_CLUSTERS.computeIfAbsent(workerProps, props -> {
+            Properties brokerProps = new Properties();
+            brokerProps.put("transaction.state.log.replication.factor", "1");
+            brokerProps.put("transaction.state.log.min.isr", "1");
+
+            // Have to declare a new map since the passed-in one may be 
immutable
+            Map<String, String> workerPropsWithDefaults = new 
HashMap<>(workerProps);
+            // Enable fast offset commits by default
+            
workerPropsWithDefaults.putIfAbsent(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+
+            EmbeddedConnectCluster result = new 
EmbeddedConnectCluster.Builder()
+                    .name("connect-cluster")
+                    .numWorkers(NUM_WORKERS)
+                    .brokerProps(brokerProps)
+                    .workerProps(workerPropsWithDefaults)
+                    .build();
+
+            result.start();
+
+            return result;
+        });
+    }
+
+    private static EmbeddedConnectCluster defaultConnectCluster() {
+        return createOrReuseConnectWithWorkerProps(Collections.emptyMap());
+    }
+
+    private static EmbeddedConnectCluster exactlyOnceSourceConnectCluster() {
+        Map<String, String> workerProps = Collections.singletonMap(
+                DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+                "enabled"
+        );
+        return createOrReuseConnectWithWorkerProps(workerProps);
     }
 
     @Test
     public void testGetNonExistentConnectorOffsets() {
-        connect = connectBuilder.build();
-        connect.start();
         ConnectRestException e = assertThrows(ConnectRestException.class,
                 () -> connect.connectorOffsets("non-existent-connector"));
         assertEquals(404, e.errorCode());
     }
 
     @Test
     public void testGetSinkConnectorOffsets() throws Exception {
-        connect = connectBuilder.build();
-        connect.start();
         getAndVerifySinkConnectorOffsets(baseSinkConnectorConfigs(), 
connect.kafka());
     }
 
     @Test
     public void testGetSinkConnectorOffsetsOverriddenConsumerGroupId() throws 
Exception {
-        connect = connectBuilder.build();
-        connect.start();
         Map<String, String> connectorConfigs = baseSinkConnectorConfigs();
-        
connectorConfigs.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX 
+ CommonClientConfigs.GROUP_ID_CONFIG,
-                "overridden-group-id");
+        String overriddenGroupId = connectorName = "-overridden-group-id";

Review Comment:
   ```suggestion
           String overriddenGroupId = connectorName + "-overridden-group-id";
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -895,6 +909,55 @@ private Map<String, String> baseSourceConnectorConfigs() {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offets should be reset instead
+     * @return the response from the REST API, if the request was successful
+     * @throws InterruptedException if the thread is interrupted while waiting 
for a
+     * request to modify the connector's offsets to succeed
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15826";>KAFKA-15826</a>
+     */
+    private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets 
offsetsToAlter) throws InterruptedException {
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running

Review Comment:
   I'm +1 for the retry logic here since I've seen the same occasional CI 
flakiness that you've indicated, but I don't quite get how it's related to 
`KAFKA-15826` given that the `MonitorableSinkConnector` / `MonitorableSinkTask` 
being used here isn't really expected to block long enough in its stop method 
to cause task cancellation to occur? Furthermore, given that the referenced 
ticket isn't resolved yet, retrying here wouldn't make sense since even task 
cancellation wouldn't cause the task's consumer to be closed right? Or is it 
linked here as more of a future reference (once the ticket is resolved)?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -895,6 +909,55 @@ private Map<String, String> baseSourceConnectorConfigs() {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offets should be reset instead
+     * @return the response from the REST API, if the request was successful
+     * @throws InterruptedException if the thread is interrupted while waiting 
for a
+     * request to modify the connector's offsets to succeed
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15826";>KAFKA-15826</a>
+     */
+    private String modifySinkConnectorOffsetsWithRetry(ConnectorOffsets 
offsetsToAlter) throws InterruptedException {
+        // Some retry logic is necessary to account for KAFKA-15826,
+        // where laggy sink task startup/shutdown can leave consumers running
+        String conditionDetails = "Failed to "
+                + (offsetsToAlter != null ?  "alter" : "reset")
+                + " sink connector offsets in time";
+        AtomicReference<String> responseReference = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    try {
+                        if (offsetsToAlter == null) {
+                            
responseReference.set(connect.resetConnectorOffsets(connectorName));
+                        } else {
+                            
responseReference.set(connect.alterConnectorOffsets(connectorName, 
offsetsToAlter));
+                        }
+                        return true;
+                    } catch (ConnectRestException e) {
+                        boolean internalServerError = e.statusCode() == 
INTERNAL_SERVER_ERROR.getStatusCode();
+
+                        String message = 
Optional.of(e.getMessage()).orElse("");
+                        boolean failedToResetConsumerOffsets = 
message.contains("Failed to reset consumer group offsets for connector");

Review Comment:
   It looks like this method can also be used to alter a sink connector's 
offsets so shouldn't we account for that failure message as well?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -895,6 +909,55 @@ private Map<String, String> baseSourceConnectorConfigs() {
         return props;
     }
 
+    /**
+     * Modify (i.e., alter or reset) the offsets for a sink connector, with 
retry logic to
+     * handle cases where laggy task shutdown may have left a consumer in the 
group.
+     * @param offsetsToAlter the offsets to alter for the sink connector, or 
null if
+     *                       the connector's offets should be reset instead

Review Comment:
   ```suggestion
        *                       the connector's offsets should be reset instead
   ```
   nit: typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to