yashmayya commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1316932577
########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // The last message should be re-processed when the connector is resumed after the offsets are altered + verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); + } + + @Test + public void testResetOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to cause all the message in the topic to be re-processed + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // All the messages should be re-processed when the connector is resumed after the offsets are reset + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + @Test + public void testSinkMultipleTopicsWithMultipleTasks() throws Exception { + String topic2 = "test-topic-2"; + connect.kafka().createTopic(topic2); + produceMessagesToTopic(topic2, NUM_MESSAGES); + + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + "," + topic2, tempFilePath.toString()); + connectorConfigs.put(TASKS_MAX_CONFIG, "2"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 2, + "Connector and task did not start in time"); + + // Only verify the number of lines since the messages can be consumed in any order across the two topics + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + private void produceMessagesToTopic(String topic, int numMessages) { + for (int i = 0; i < numMessages; i++) { + connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i)); + } + } + + private Map<String, String> baseConnectorConfigs(String topics, String filePath) { + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, topics); + connectorConfigs.put(FILE_CONFIG, filePath); + return connectorConfigs; + } + + /** + * Verify that the number of lines in the file at {@code filePath} is equal to {@code numLines}. + * If {@code verifyLinearity} is true, this method will also verify that the lines match {@link #MESSAGE_FORMAT} + * with a linearly increasing message number (beginning with 0). + * + * @param filePath the file path + * @param numLines the expected number of lines in the file + * @param verifyLinearity true if the line contents are to be verified + */ + private void verifyLinesInFile(Path filePath, int numLines, boolean verifyLinearity) throws Exception { + TestUtils.waitForCondition(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(filePath)))) { + for (int i = 0; i < numLines; i++) { Review Comment: Good point - I've added a check to verify the count of the total number of lines in the file after we've already read the expected number of lines from the file. This is similar to your suggestion for the source connector case and has the same benefits and (minor) pitfalls. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // The last message should be re-processed when the connector is resumed after the offsets are altered + verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); + } + + @Test + public void testResetOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to cause all the message in the topic to be re-processed + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // All the messages should be re-processed when the connector is resumed after the offsets are reset + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + @Test + public void testSinkMultipleTopicsWithMultipleTasks() throws Exception { + String topic2 = "test-topic-2"; + connect.kafka().createTopic(topic2); + produceMessagesToTopic(topic2, NUM_MESSAGES); + + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + "," + topic2, tempFilePath.toString()); + connectorConfigs.put(TASKS_MAX_CONFIG, "2"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 2, + "Connector and task did not start in time"); + + // Only verify the number of lines since the messages can be consumed in any order across the two topics + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + private void produceMessagesToTopic(String topic, int numMessages) { + for (int i = 0; i < numMessages; i++) { + connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i)); + } + } + + private Map<String, String> baseConnectorConfigs(String topics, String filePath) { + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, topics); + connectorConfigs.put(FILE_CONFIG, filePath); + return connectorConfigs; + } + + /** + * Verify that the number of lines in the file at {@code filePath} is equal to {@code numLines}. + * If {@code verifyLinearity} is true, this method will also verify that the lines match {@link #MESSAGE_FORMAT} + * with a linearly increasing message number (beginning with 0). + * + * @param filePath the file path + * @param numLines the expected number of lines in the file + * @param verifyLinearity true if the line contents are to be verified + */ + private void verifyLinesInFile(Path filePath, int numLines, boolean verifyLinearity) throws Exception { + TestUtils.waitForCondition(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(filePath)))) { + for (int i = 0; i < numLines; i++) { + String line = reader.readLine(); + if (line == null) { Review Comment: Makes sense, done. I also realized that the `verifyLinesInFile` implementation was suboptimal because we were unnecessarily creating a new stream and reading from the beginning in every `waitForCondition` iteration so I've fixed that too. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, Review Comment: Nope, copy pasting is bad 😞 ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); Review Comment: Thanks, that's a great suggestion! I've added a couple of utility methods to alter the offset for a single partition for source and sink connectors. I can also add a couple more utility methods to allow altering offsets for multiple partitions and use them in `OffsetsApiIntegrationTest` and elsewhere but since it's an orthogonal concern to this PR, I think it'd make more sense to file a separate minor PR for that. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testStopResumeSavedOffset() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Append NUM_LINES more lines to the file + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { + for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { + printStream.println(String.format(LINE_FORMAT, i)); + } + } + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect only 2 * NUM_LINES messages to be produced since the connector should continue from where it left off on being resumed + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testAlterOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to make the connector re-process the last line in the file + Map<String, Object> partition = Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath()); + Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 28L); + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset)))); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the last line has been sourced again after the alter offsets request + assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new String(recordIterator.next().value())); + } + + @Test + public void testResetOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to make the connector re-read all the previously written lines + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect 2 * NUM_LINES messages to be produced + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the same lines have been sourced again after the reset offsets request + while (i < 2 * NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i - NUM_LINES), new String(recordIterator.next().value())); + i++; + } + } + + /** + * Create a temporary file and append {@code numLines} to it + * + * @param numLines the number of lines to be appended to the created file + * @return the created file + */ + private File createTempFile(int numLines) throws Exception { + File sourceFile = TestUtils.tempFile(); + + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { Review Comment: Ah, I think it's left over from when I was playing around with tests where files are appended to while the connector is running; I've removed it now. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testStopResumeSavedOffset() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Append NUM_LINES more lines to the file + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { + for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { + printStream.println(String.format(LINE_FORMAT, i)); + } + } + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect only 2 * NUM_LINES messages to be produced since the connector should continue from where it left off on being resumed + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testAlterOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to make the connector re-process the last line in the file + Map<String, Object> partition = Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath()); + Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 28L); + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset)))); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the last line has been sourced again after the alter offsets request + assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new String(recordIterator.next().value())); + } + + @Test + public void testResetOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to make the connector re-read all the previously written lines + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect 2 * NUM_LINES messages to be produced Review Comment: Good point, done 👍 ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, Review Comment: Nope, copy pasting is bad 😞 ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testStopResumeSavedOffset() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Append NUM_LINES more lines to the file + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { + for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { + printStream.println(String.format(LINE_FORMAT, i)); + } + } + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect only 2 * NUM_LINES messages to be produced since the connector should continue from where it left off on being resumed + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testAlterOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to make the connector re-process the last line in the file + Map<String, Object> partition = Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath()); + Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 28L); + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset)))); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the last line has been sourced again after the alter offsets request + assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new String(recordIterator.next().value())); + } + + @Test + public void testResetOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to make the connector re-read all the previously written lines + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect 2 * NUM_LINES messages to be produced Review Comment: Good point, done 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org