C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1316170261
########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // The last message should be re-processed when the connector is resumed after the offsets are altered + verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); + } + + @Test + public void testResetOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to cause all the message in the topic to be re-processed + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // All the messages should be re-processed when the connector is resumed after the offsets are reset + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + @Test + public void testSinkMultipleTopicsWithMultipleTasks() throws Exception { + String topic2 = "test-topic-2"; + connect.kafka().createTopic(topic2); + produceMessagesToTopic(topic2, NUM_MESSAGES); + + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + "," + topic2, tempFilePath.toString()); + connectorConfigs.put(TASKS_MAX_CONFIG, "2"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 2, + "Connector and task did not start in time"); + + // Only verify the number of lines since the messages can be consumed in any order across the two topics + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + private void produceMessagesToTopic(String topic, int numMessages) { + for (int i = 0; i < numMessages; i++) { + connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i)); + } + } + + private Map<String, String> baseConnectorConfigs(String topics, String filePath) { + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, topics); + connectorConfigs.put(FILE_CONFIG, filePath); + return connectorConfigs; + } + + /** + * Verify that the number of lines in the file at {@code filePath} is equal to {@code numLines}. + * If {@code verifyLinearity} is true, this method will also verify that the lines match {@link #MESSAGE_FORMAT} + * with a linearly increasing message number (beginning with 0). + * + * @param filePath the file path + * @param numLines the expected number of lines in the file + * @param verifyLinearity true if the line contents are to be verified + */ + private void verifyLinesInFile(Path filePath, int numLines, boolean verifyLinearity) throws Exception { + TestUtils.waitForCondition(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(filePath)))) { + for (int i = 0; i < numLines; i++) { + String line = reader.readLine(); + if (line == null) { + return false; + } + if (verifyLinearity) { + assertEquals(String.format(MESSAGE_FORMAT, i), line); + } Review Comment: Optional: if it's not too much work, we might at least verify that each line matches the message format (ignoring the line number) if `verifyLinearity` is `false`. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); Review Comment: Optional: might be nice to have convenience wrapper methods to alter sink connector offsets based on a `Map<TopicPartition, Long>` and source connector offsets based on a `Map<Map<String, ?>, Map<String, ?>>` that do some of the lifting here for us and then delegate to the existing `EmbeddedConnectCluster::alterOffsets` method. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testStopResumeSavedOffset() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Append NUM_LINES more lines to the file + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { + for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { + printStream.println(String.format(LINE_FORMAT, i)); + } + } + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect only 2 * NUM_LINES messages to be produced since the connector should continue from where it left off on being resumed + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testAlterOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to make the connector re-process the last line in the file + Map<String, Object> partition = Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath()); + Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 28L); + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset)))); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the last line has been sourced again after the alter offsets request + assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new String(recordIterator.next().value())); + } + + @Test + public void testResetOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to make the connector re-read all the previously written lines + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect 2 * NUM_LINES messages to be produced + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the same lines have been sourced again after the reset offsets request + while (i < 2 * NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i - NUM_LINES), new String(recordIterator.next().value())); + i++; + } + } + + /** + * Create a temporary file and append {@code numLines} to it + * + * @param numLines the number of lines to be appended to the created file + * @return the created file + */ + private File createTempFile(int numLines) throws Exception { + File sourceFile = TestUtils.tempFile(); + + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { Review Comment: Just OOC, why are opening this in append mode? Wouldn't it be safer to wipe existing content? ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testStopResumeSavedOffset() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Append NUM_LINES more lines to the file + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { + for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { + printStream.println(String.format(LINE_FORMAT, i)); + } + } + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect only 2 * NUM_LINES messages to be produced since the connector should continue from where it left off on being resumed + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testAlterOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to make the connector re-process the last line in the file + Map<String, Object> partition = Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath()); + Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 28L); + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset)))); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); Review Comment: Nit: ```suggestion for (int i = 0; i < NUM_LINES; i++) { assertEquals(String.format(LINE_FORMAT, i), new String(recordIterator.next().value())); ``` ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, Review Comment: Any reason to use at least instead of exact (i.e., `assertConnectorAndExactlyNumTasksAreRunning`)? ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, Review Comment: Same thought RE `atLeast` vs. `exactly` ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // The last message should be re-processed when the connector is resumed after the offsets are altered + verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); + } + + @Test + public void testResetOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to cause all the message in the topic to be re-processed + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // All the messages should be re-processed when the connector is resumed after the offsets are reset + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + @Test + public void testSinkMultipleTopicsWithMultipleTasks() throws Exception { + String topic2 = "test-topic-2"; + connect.kafka().createTopic(topic2); + produceMessagesToTopic(topic2, NUM_MESSAGES); + + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + "," + topic2, tempFilePath.toString()); + connectorConfigs.put(TASKS_MAX_CONFIG, "2"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 2, + "Connector and task did not start in time"); + + // Only verify the number of lines since the messages can be consumed in any order across the two topics + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + private void produceMessagesToTopic(String topic, int numMessages) { + for (int i = 0; i < numMessages; i++) { + connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i)); + } + } + + private Map<String, String> baseConnectorConfigs(String topics, String filePath) { + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, topics); + connectorConfigs.put(FILE_CONFIG, filePath); + return connectorConfigs; + } + + /** + * Verify that the number of lines in the file at {@code filePath} is equal to {@code numLines}. + * If {@code verifyLinearity} is true, this method will also verify that the lines match {@link #MESSAGE_FORMAT} + * with a linearly increasing message number (beginning with 0). + * + * @param filePath the file path + * @param numLines the expected number of lines in the file + * @param verifyLinearity true if the line contents are to be verified + */ + private void verifyLinesInFile(Path filePath, int numLines, boolean verifyLinearity) throws Exception { + TestUtils.waitForCondition(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(filePath)))) { + for (int i = 0; i < numLines; i++) { Review Comment: Doesn't this lead to false positives if there are more lines in the file than expected? ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); Review Comment: Nit: It's a little unclear where `4` comes from: ```suggestion Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, NUM_MESSAGES - 1); ``` ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSourceConnectorIntegrationTest.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.connect.file.FileStreamSourceConnector.FILE_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceConnector.TOPIC_CONFIG; +import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD; +import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Tag("integration") +public class FileStreamSourceConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String LINE_FORMAT = "Line %d"; + private static final int NUM_LINES = 5; + private static final long TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15); + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + private File sourceFile; + + @BeforeEach + public void setup() throws Exception { + connect.start(); + sourceFile = createTempFile(NUM_LINES); + connect.kafka().createTopic(TOPIC); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSource() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testStopResumeSavedOffset() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Append NUM_LINES more lines to the file + try (PrintStream printStream = new PrintStream(Files.newOutputStream(sourceFile.toPath(), StandardOpenOption.APPEND))) { + for (int i = NUM_LINES; i < 2 * NUM_LINES; i++) { + printStream.println(String.format(LINE_FORMAT, i)); + } + } + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect only 2 * NUM_LINES messages to be produced since the connector should continue from where it left off on being resumed + assertThrows(EmbeddedKafkaCluster.NotEnoughRecordsException.class, () -> connect.kafka().consume(2 * NUM_LINES + 1, TIMEOUT_MS, TOPIC)); + + int i = 0; + for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(2 * NUM_LINES, TIMEOUT_MS, TOPIC)) { + assertEquals(String.format(LINE_FORMAT, i++), new String(record.value())); + } + } + + @Test + public void testAlterOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to make the connector re-process the last line in the file + Map<String, Object> partition = Collections.singletonMap(FILENAME_FIELD, sourceFile.getAbsolutePath()); + Map<String, Object> offset = Collections.singletonMap(POSITION_FIELD, 28L); + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(Collections.singletonList(new ConnectorOffset(partition, offset)))); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + Iterator<ConsumerRecord<byte[], byte[]>> recordIterator = connect.kafka().consume(NUM_LINES + 1, TIMEOUT_MS, TOPIC).iterator(); + + int i = 0; + while (i < NUM_LINES) { + assertEquals(String.format(LINE_FORMAT, i++), new String(recordIterator.next().value())); + } + + // Verify that the last line has been sourced again after the alter offsets request + assertEquals(String.format(LINE_FORMAT, NUM_LINES - 1), new String(recordIterator.next().value())); + } + + @Test + public void testResetOffsets() throws Exception { + Map<String, String> connectorConfigs = baseConnectorConfigs(sourceFile.getAbsolutePath()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + // Wait for the initially written records to be sourced by the connector and produced to the configured Kafka topic + connect.kafka().consume(NUM_LINES, TIMEOUT_MS, TOPIC); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to make the connector re-read all the previously written lines + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // We expect 2 * NUM_LINES messages to be produced Review Comment: In green-path scenarios, this forces a wait of 15 seconds, which is a little suboptimal. What do you think about moving this step to the end of the test (after we've verified that the topic has at least `NUM_LINES * 2` records), and using [EmbeddedKafkaCluster::consumeAll](https://github.com/apache/kafka/blob/37a51e286d5aaa890439e074e9f781ec26aaef2e/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L536C44-L536C54) instead of introducing the new `NotEnoughRecordsException` class? This would still leave room for false negatives, but it'd be relatively small: we'd know that, by the time we do the read-to-end of the topic, it would have at least `NUM_LINES * 2` records, and even a single record beyond that would cause the test to fail. ########## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + + private static final String CONNECTOR_NAME = "test-connector"; + private static final String TOPIC = "test-topic"; + private static final String MESSAGE_FORMAT = "Message %d"; + private static final int NUM_MESSAGES = 5; + private static final String FILE_NAME = "test-file"; + private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + + @BeforeEach + public void setup() { + connect.start(); + connect.kafka().createTopic(TOPIC); + produceMessagesToTopic(TOPIC, NUM_MESSAGES); + } + + @AfterEach + public void tearDown() { + connect.stop(); + } + + @Test + public void testSimpleSink() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + } + + @Test + public void testAlterOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Alter the offsets to cause the last message in the topic to be re-processed + Map<String, Object> partition = new HashMap<>(); + partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); + partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map<String, Object> offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); + List<ConnectorOffset> offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + + connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // The last message should be re-processed when the connector is resumed after the offsets are altered + verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); + } + + @Test + public void testResetOffsets() throws Exception { + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not start in time"); + + verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + + connect.stopConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + + // Reset the offsets to cause all the message in the topic to be re-processed + connect.resetConnectorOffsets(CONNECTOR_NAME); + + connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, + "Connector and task did not resume in time"); + + // All the messages should be re-processed when the connector is resumed after the offsets are reset + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + @Test + public void testSinkMultipleTopicsWithMultipleTasks() throws Exception { + String topic2 = "test-topic-2"; + connect.kafka().createTopic(topic2); + produceMessagesToTopic(topic2, NUM_MESSAGES); + + File tempDir = TestUtils.tempDirectory(); + Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); + Map<String, String> connectorConfigs = baseConnectorConfigs(TOPIC + "," + topic2, tempFilePath.toString()); + connectorConfigs.put(TASKS_MAX_CONFIG, "2"); + + connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 2, + "Connector and task did not start in time"); + + // Only verify the number of lines since the messages can be consumed in any order across the two topics + verifyLinesInFile(tempFilePath, 2 * NUM_MESSAGES, false); + } + + private void produceMessagesToTopic(String topic, int numMessages) { + for (int i = 0; i < numMessages; i++) { + connect.kafka().produce(topic, String.format(MESSAGE_FORMAT, i)); + } + } + + private Map<String, String> baseConnectorConfigs(String topics, String filePath) { + Map<String, String> connectorConfigs = new HashMap<>(); + connectorConfigs.put(CONNECTOR_CLASS_CONFIG, FileStreamSinkConnector.class.getName()); + connectorConfigs.put(TOPICS_CONFIG, topics); + connectorConfigs.put(FILE_CONFIG, filePath); + return connectorConfigs; + } + + /** + * Verify that the number of lines in the file at {@code filePath} is equal to {@code numLines}. + * If {@code verifyLinearity} is true, this method will also verify that the lines match {@link #MESSAGE_FORMAT} + * with a linearly increasing message number (beginning with 0). + * + * @param filePath the file path + * @param numLines the expected number of lines in the file + * @param verifyLinearity true if the line contents are to be verified + */ + private void verifyLinesInFile(Path filePath, int numLines, boolean verifyLinearity) throws Exception { + TestUtils.waitForCondition(() -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(filePath)))) { + for (int i = 0; i < numLines; i++) { + String line = reader.readLine(); + if (line == null) { Review Comment: We could potentially use [BufferedReader::lines](https://docs.oracle.com/javase/8/docs/api/java/io/BufferedReader.html#lines--) to simplify this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org