C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1316170261


##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // The last message should be re-processed when the connector is 
resumed after the offsets are altered
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to cause all the message in the topic to be 
re-processed
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // All the messages should be re-processed when the connector is 
resumed after the offsets are reset
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    @Test
+    public void testSinkMultipleTopicsWithMultipleTasks() throws Exception {
+        String topic2 = "test-topic-2";
+        connect.kafka().createTopic(topic2);
+        produceMessagesToTopic(topic2, NUM_MESSAGES);
+
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + 
"," + topic2, tempFilePath.toString());
+        connectorConfigs.put(TASKS_MAX_CONFIG, "2");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 2,
+            "Connector and task did not start in time");
+
+        // Only verify the number of lines since the messages can be consumed 
in any order across the two topics
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    private void produceMessagesToTopic(String topic, int numMessages) {
+        for (int i = 0; i < numMessages; i++) {
+            connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i));
+        }
+    }
+
+    private Map<String, String> baseConnectorConfigs(String topics, String 
filePath) {
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
FileStreamSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, topics);
+        connectorConfigs.put(FILE_CONFIG, filePath);
+        return connectorConfigs;
+    }
+
+    /**
+     * Verify that the number of lines in the file at {@code filePath} is 
equal to {@code numLines}.
+     * If {@code verifyLinearity} is true, this method will also verify that 
the lines match {@link #MESSAGE_FORMAT}
+     * with a linearly increasing message number (beginning with 0).
+     *
+     * @param filePath the file path
+     * @param numLines the expected number of lines in the file
+     * @param verifyLinearity true if the line contents are to be verified
+     */
+    private void verifyLinesInFile(Path filePath, int numLines, boolean 
verifyLinearity) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(Files.newInputStream(filePath)))) {
+                for (int i = 0; i < numLines; i++) {
+                    String line = reader.readLine();
+                    if (line == null) {
+                        return false;
+                    }
+                    if (verifyLinearity) {
+                        assertEquals(String.format(MESSAGE_FORMAT, i), line);
+                    }

Review Comment:
   Optional: if it's not too much work, we might at least verify that each line 
matches the message format (ignoring the line number) if `verifyLinearity` is 
`false`.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));

Review Comment:
   Optional: might be nice to have convenience wrapper methods to alter sink 
connector offsets based on a `Map<TopicPartition, Long>` and source connector 
offsets based on a `Map<Map<String, ?>, Map<String, ?>>` that do some of the 
lifting here for us and then delegate to the existing 
`EmbeddedConnectCluster::alterOffsets` method.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : 
connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testStopResumeSavedOffset() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Append NUM_LINES more lines to the file
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {
+            for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
+                printStream.println(String.format(LINE_FORMAT, i));
+            }
+        }
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect only 2 * NUM_LINES messages to be produced since the 
connector should continue from where it left off on being resumed
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 
* NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to make the connector re-process the last line in 
the file
+        Map<String, Object> partition = 
Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath());
+        Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 
28L);
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, 
offset))));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the last line has been sourced again after the alter 
offsets request
+        assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new 
String(recordIterator.next().value()));
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to make the connector re-read all the previously 
written lines
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect 2 * NUM_LINES messages to be produced
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the same lines have been sourced again after the reset 
offsets request
+        while (i < 2 * NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i - NUM_LINES), new 
String(recordIterator.next().value()));
+            i++;
+        }
+    }
+
+    /**
+     * Create a temporary file and append {@code numLines} to it
+     *
+     * @param numLines the number of lines to be appended to the created file
+     * @return the created file
+     */
+    private File createTempFile(int numLines) throws Exception {
+        File sourceFile = TestUtils.tempFile();
+
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {

Review Comment:
   Just OOC, why are opening this in append mode? Wouldn't it be safer to wipe 
existing content?



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : 
connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testStopResumeSavedOffset() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Append NUM_LINES more lines to the file
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {
+            for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
+                printStream.println(String.format(LINE_FORMAT, i));
+            }
+        }
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect only 2 * NUM_LINES messages to be produced since the 
connector should continue from where it left off on being resumed
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 
* NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to make the connector re-process the last line in 
the file
+        Map<String, Object> partition = 
Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath());
+        Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 
28L);
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, 
offset))));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));

Review Comment:
   Nit:
   ```suggestion
           for (int i = 0; i < NUM_LINES; i++) {
               assertEquals(String.format(LINE_FORMAT, i), new 
String(recordIterator.next().value()));
   ```



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,

Review Comment:
   Any reason to use at least instead of exact (i.e., 
`assertConnectorAndExactlyNumTasksAreRunning`)?



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,

Review Comment:
   Same thought RE `atLeast` vs. `exactly`



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // The last message should be re-processed when the connector is 
resumed after the offsets are altered
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to cause all the message in the topic to be 
re-processed
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // All the messages should be re-processed when the connector is 
resumed after the offsets are reset
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    @Test
+    public void testSinkMultipleTopicsWithMultipleTasks() throws Exception {
+        String topic2 = "test-topic-2";
+        connect.kafka().createTopic(topic2);
+        produceMessagesToTopic(topic2, NUM_MESSAGES);
+
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + 
"," + topic2, tempFilePath.toString());
+        connectorConfigs.put(TASKS_MAX_CONFIG, "2");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 2,
+            "Connector and task did not start in time");
+
+        // Only verify the number of lines since the messages can be consumed 
in any order across the two topics
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    private void produceMessagesToTopic(String topic, int numMessages) {
+        for (int i = 0; i < numMessages; i++) {
+            connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i));
+        }
+    }
+
+    private Map<String, String> baseConnectorConfigs(String topics, String 
filePath) {
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
FileStreamSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, topics);
+        connectorConfigs.put(FILE_CONFIG, filePath);
+        return connectorConfigs;
+    }
+
+    /**
+     * Verify that the number of lines in the file at {@code filePath} is 
equal to {@code numLines}.
+     * If {@code verifyLinearity} is true, this method will also verify that 
the lines match {@link #MESSAGE_FORMAT}
+     * with a linearly increasing message number (beginning with 0).
+     *
+     * @param filePath the file path
+     * @param numLines the expected number of lines in the file
+     * @param verifyLinearity true if the line contents are to be verified
+     */
+    private void verifyLinesInFile(Path filePath, int numLines, boolean 
verifyLinearity) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(Files.newInputStream(filePath)))) {
+                for (int i = 0; i < numLines; i++) {

Review Comment:
   Doesn't this lead to false positives if there are more lines in the file 
than expected?



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);

Review Comment:
   Nit: It's a little unclear where `4` comes from:
   ```suggestion
           Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, NUM_MESSAGES - 1);
   ```



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
+import static 
org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Tag("integration")
+public class FileStreamSourceConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String LINE_FORMAT = "Line %d";
+    private static final int NUM_LINES = 5;
+    private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15);
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+    private File sourceFile;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        connect.start();
+        sourceFile = createTempFile(NUM_LINES);
+        connect.kafka().createTopic(TOPIC);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSource() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : 
connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testStopResumeSavedOffset() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Append NUM_LINES more lines to the file
+        try (PrintStream printStream = new 
PrintStream(Files.newOutputStream(sourceFile.toPath(), 
StandardOpenOption.APPEND))) {
+            for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) {
+                printStream.println(String.format(LINE_FORMAT, i));
+            }
+        }
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect only 2 * NUM_LINES messages to be produced since the 
connector should continue from where it left off on being resumed
+        assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () 
-> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC));
+
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 
* NUM_LINES, TIMEOUT_MS, TOPIC)) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(record.value()));
+        }
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to make the connector re-process the last line in 
the file
+        Map<String, Object> partition = 
Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath());
+        Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 
28L);
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, 
offset))));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = 
connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator();
+
+        int i = 0;
+        while (i < NUM_LINES) {
+            assertEquals(String.format(LINE_FORMAT, i++), new 
String(recordIterator.next().value()));
+        }
+
+        // Verify that the last line has been sourced again after the alter 
offsets request
+        assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new 
String(recordIterator.next().value()));
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        Map<String, String> connectorConfigs = 
baseConnectorConfigs(sourceFile.getAbsolutePath());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        // Wait for the initially written records to be sourced by the 
connector and produced to the configured Kafka topic
+        connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to make the connector re-read all the previously 
written lines
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // We expect 2 * NUM_LINES messages to be produced

Review Comment:
   In green-path scenarios, this forces a wait of 15 seconds, which is a little 
suboptimal.
   
   What do you think about moving this step to the end of the test (after we've 
verified that the topic has at least `NUM_LINES * 2` records), and using 
[EmbeddedKafkaCluster::consumeAll](https://github.com/apache/kafka/blob/37a51e286d5aaa890439e074e9f781ec26aaef2e/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L536C44-L536C54)
 instead of introducing the new `NotEnoughRecordsException` class?
   
   This would still leave room for false negatives, but it'd be relatively 
small: we'd know that, by the time we do the read-to-end of the topic, it would 
have at least `NUM_LINES * 2` records, and even a single record beyond that 
would cause the test to fail.



##########
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+    private static final String CONNECTOR_NAME = "test-connector";
+    private static final String TOPIC = "test-topic";
+    private static final String MESSAGE_FORMAT = "Message %d";
+    private static final int NUM_MESSAGES = 5;
+    private static final String FILE_NAME = "test-file";
+    private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+    @BeforeEach
+    public void setup() {
+        connect.start();
+        connect.kafka().createTopic(TOPIC);
+        produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        connect.stop();
+    }
+
+    @Test
+    public void testSimpleSink() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+    }
+
+    @Test
+    public void testAlterOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Alter the offsets to cause the last message in the topic to be 
re-processed
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+        partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+        Map<String, Object> offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+        List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+        connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // The last message should be re-processed when the connector is 
resumed after the offsets are altered
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+    }
+
+    @Test
+    public void testResetOffsets() throws Exception {
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not start in time");
+
+        verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+        connect.stopConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+        // Reset the offsets to cause all the message in the topic to be 
re-processed
+        connect.resetConnectorOffsets(CONNECTOR_NAME);
+
+        connect.resumeConnector(CONNECTOR_NAME);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+            "Connector and task did not resume in time");
+
+        // All the messages should be re-processed when the connector is 
resumed after the offsets are reset
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    @Test
+    public void testSinkMultipleTopicsWithMultipleTasks() throws Exception {
+        String topic2 = "test-topic-2";
+        connect.kafka().createTopic(topic2);
+        produceMessagesToTopic(topic2, NUM_MESSAGES);
+
+        File tempDir = TestUtils.tempDirectory();
+        Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+        Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + 
"," + topic2, tempFilePath.toString());
+        connectorConfigs.put(TASKS_MAX_CONFIG, "2");
+
+        connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 2,
+            "Connector and task did not start in time");
+
+        // Only verify the number of lines since the messages can be consumed 
in any order across the two topics
+        verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false);
+    }
+
+    private void produceMessagesToTopic(String topic, int numMessages) {
+        for (int i = 0; i < numMessages; i++) {
+            connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i));
+        }
+    }
+
+    private Map<String, String> baseConnectorConfigs(String topics, String 
filePath) {
+        Map<String, String> connectorConfigs = new HashMap<>();
+        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
FileStreamSinkConnector.class.getName());
+        connectorConfigs.put(TOPICS_CONFIG, topics);
+        connectorConfigs.put(FILE_CONFIG, filePath);
+        return connectorConfigs;
+    }
+
+    /**
+     * Verify that the number of lines in the file at {@code filePath} is 
equal to {@code numLines}.
+     * If {@code verifyLinearity} is true, this method will also verify that 
the lines match {@link #MESSAGE_FORMAT}
+     * with a linearly increasing message number (beginning with 0).
+     *
+     * @param filePath the file path
+     * @param numLines the expected number of lines in the file
+     * @param verifyLinearity true if the line contents are to be verified
+     */
+    private void verifyLinesInFile(Path filePath, int numLines, boolean 
verifyLinearity) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(Files.newInputStream(filePath)))) {
+                for (int i = 0; i < numLines; i++) {
+                    String line = reader.readLine();
+                    if (line == null) {

Review Comment:
   We could potentially use 
[BufferedReader::lines](https://docs.oracle.com/javase/8/docs/api/java/io/BufferedReader.html#lines--)
 to simplify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to