yashmayya commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1316932577


##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // The last message should be re-processed when the connector is 
resumed after the offsets are altered
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to cause all the message in the topic to be 
re-processed
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // All the messages should be re-processed when the connector is 
resumed after the offsets are reset
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    @Test
+    public void testSinkMultipleTopicsWithMultipleTasks() throws Exception {
+        String topic2 = "test-topic-2";
+        connect.kafka().createTopic(topic2);
+        produceMessagesToTopic(topic2, NUM_MESSAGES);
+
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + 
"," + topic2, tempFilePath.toString());
+        connectorConfigs.put(TASKS_MAX_CONFIG, "2");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 2,
+            "Connector and task did not start in time");
+
+        // Only verify the number of lines since the messages can be consumed 
in any order across the two topics
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    private void produceMessagesToTopic(String topic, int numMessages) {
+        for (int i = 0; i < numMessages; i++) {
+            connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i));
+        }
+    }
+
+    private Map<String, String> baseConnectorConfigs(String topics, String 
filePath) {
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
FileStreamSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, topics);
+        connectorConfigs.put(FILE_CONFIG, filePath);
+        return connectorConfigs;
+    }
+
+    /**
+     * Verify that the number of lines in the file at {@code filePath} is 
equal to {@code numLines}.
+     * If {@code verifyLinearity} is true, this method will also verify that 
the lines match {@link #MESSAGE_FORMAT}
+     * with a linearly increasing message number (beginning with 0).
+     *
+     * @param filePath the file path
+     * @param numLines the expected number of lines in the file
+     * @param verifyLinearity true if the line contents are to be verified
+     */
+    private void verifyLinesInFile(Path filePath, int numLines, boolean 
verifyLinearity) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(Files.newInputStream(filePath)))) {
+                for (int i = 0; i < numLines; i++) {

Review Comment:
   Good point - I've added a check to verify the count of the total number of 
lines in the file after we've already read the expected number of lines from 
the file. This is similar to your suggestion for the source connector case and 
has the same benefits and (minor) pitfalls.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // The last message should be re-processed when the connector is 
resumed after the offsets are altered
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to cause all the message in the topic to be 
re-processed
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // All the messages should be re-processed when the connector is 
resumed after the offsets are reset
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    @Test
+    public void testSinkMultipleTopicsWithMultipleTasks() throws Exception {
+        String topic2 = "test-topic-2";
+        connect.kafka().createTopic(topic2);
+        produceMessagesToTopic(topic2, NUM_MESSAGES);
+
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + 
"," + topic2, tempFilePath.toString());
+        connectorConfigs.put(TASKS_MAX_CONFIG, "2");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 2,
+            "Connector and task did not start in time");
+
+        // Only verify the number of lines since the messages can be consumed 
in any order across the two topics
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    private void produceMessagesToTopic(String topic, int numMessages) {
+        for (int i = 0; i < numMessages; i++) {
+            connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i));
+        }
+    }
+
+    private Map<String, String> baseConnectorConfigs(String topics, String 
filePath) {
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
FileStreamSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, topics);
+        connectorConfigs.put(FILE_CONFIG, filePath);
+        return connectorConfigs;
+    }
+
+    /**
+     * Verify that the number of lines in the file at {@code filePath} is 
equal to {@code numLines}.
+     * If {@code verifyLinearity} is true, this method will also verify that 
the lines match {@link #MESSAGE_FORMAT}
+     * with a linearly increasing message number (beginning with 0).
+     *
+     * @param filePath the file path
+     * @param numLines the expected number of lines in the file
+     * @param verifyLinearity true if the line contents are to be verified
+     */
+    private void verifyLinesInFile(Path filePath, int numLines, boolean 
verifyLinearity) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(Files.newInputStream(filePath)))) {
+                for (int i = 0; i < numLines; i++) {
+                    String line = reader.readLine();
+                    if (line == null) {

Review Comment:
   Makes sense, done. I also realized that the `verifyLinesInFile` 
implementation was suboptimal because we were unnecessarily creating a new 
stream and reading from the beginning in every `waitForCondition` iteration so 
I've fixed that too.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,

Review Comment:
   Nope, copy pasting is bad 😞



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));

Review Comment:
   Thanks, that's a great suggestion! I've added a couple of utility methods to 
alter the offset for a single partition for source and sink connectors. I can 
also add a couple more utility methods to allow altering offsets for multiple 
partitions and use them in `OffsetsApiIntegrationTest` and elsewhere but since 
it's an orthogonal concern to this PR, I think it'd make more sense to file a 
separate minor PR for that.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : 
connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testStopResumeSavedOffset() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Append NUM_LINES more lines to the file
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {
+            for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
+                printStream.println(String.format(LINE_FORMAT, i));
+            }
+        }
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect only 2 * NUM_LINES messages to be produced since the 
connector should continue from where it left off on being resumed
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 
* NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to make the connector re-process the last line in 
the file
+        Map<String, Object> partition = 
Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath());
+        Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 
28L);
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, 
offset))));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the last line has been sourced again after the alter 
offsets request
+        assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new 
String(recordIterator.next().value()));
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to make the connector re-read all the previously 
written lines
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect 2 * NUM_LINES messages to be produced
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the same lines have been sourced again after the reset 
offsets request
+        while (i < 2 * NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i - NUM_LINES), new 
String(recordIterator.next().value()));
+            i++;
+        }
+    }
+
+    /**
+     * Create a temporary file and append {@code numLines} to it
+     *
+     * @param numLines the number of lines to be appended to the created file
+     * @return the created file
+     */
+    private File createTempFile(int numLines) throws Exception {
+        File sourceFile = TestUtils.tempFile();
+
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {

Review Comment:
   Ah, I think it's left over from when I was playing around with tests where 
files are appended to while the connector is running; I've removed it now.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : 
connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testStopResumeSavedOffset() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Append NUM_LINES more lines to the file
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {
+            for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
+                printStream.println(String.format(LINE_FORMAT, i));
+            }
+        }
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect only 2 * NUM_LINES messages to be produced since the 
connector should continue from where it left off on being resumed
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 
* NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to make the connector re-process the last line in 
the file
+        Map<String, Object> partition = 
Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath());
+        Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 
28L);
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, 
offset))));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the last line has been sourced again after the alter 
offsets request
+        assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new 
String(recordIterator.next().value()));
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to make the connector re-read all the previously 
written lines
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect 2 * NUM_LINES messages to be produced

Review Comment:
   Good point, done 👍 



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,

Review Comment:
   Nope, copy pasting is bad 😞



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : 
connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testStopResumeSavedOffset() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Append NUM_LINES more lines to the file
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {
+            for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
+                printStream.println(String.format(LINE_FORMAT, i));
+            }
+        }
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect only 2 * NUM_LINES messages to be produced since the 
connector should continue from where it left off on being resumed
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 
* NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to make the connector re-process the last line in 
the file
+        Map<String, Object> partition = 
Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath());
+        Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 
28L);
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, 
offset))));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the last line has been sourced again after the alter 
offsets request
+        assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new 
String(recordIterator.next().value()));
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to make the connector re-read all the previously 
written lines
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect 2 * NUM_LINES messages to be produced

Review Comment:
   Good point, done 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to