[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1318853979 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_PREFIX = "Message "; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +connect.alterSinkConnectorOffset(CONNECTOR_NAME, new TopicPartition(TOPIC, 0), (long) (NUM_MESSAGES - 1)); + +connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not resume in time"); + +// The last message should be re-processed when the connector is resumed after the offsets are altered +verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false); +} + +@Test +public void testResetOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath =
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849972 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_FORMAT = "Message %d"; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); +Map offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); +List offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + +connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); Review Comment: Looks good, thanks ##
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849515 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_FORMAT = "Message %d"; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); +Map offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); +List offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + +connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + +connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +
[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors
C0urante commented on code in PR #14279: URL: https://github.com/apache/kafka/pull/14279#discussion_r1316170261 ## connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.file.integration; + +import org.apache.kafka.connect.file.FileStreamSinkConnector; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; +import org.apache.kafka.connect.util.SinkUtils; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; +import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("integration") +public class FileStreamSinkConnectorIntegrationTest { + +private static final String CONNECTOR_NAME = "test-connector"; +private static final String TOPIC = "test-topic"; +private static final String MESSAGE_FORMAT = "Message %d"; +private static final int NUM_MESSAGES = 5; +private static final String FILE_NAME = "test-file"; +private final EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder().build(); + +@BeforeEach +public void setup() { +connect.start(); +connect.kafka().createTopic(TOPIC); +produceMessagesToTopic(TOPIC, NUM_MESSAGES); +} + +@AfterEach +public void tearDown() { +connect.stop(); +} + +@Test +public void testSimpleSink() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); +} + +@Test +public void testAlterOffsets() throws Exception { +File tempDir = TestUtils.tempDirectory(); +Path tempFilePath = tempDir.toPath().resolve(FILE_NAME); +Map connectorConfigs = baseConnectorConfigs(TOPIC, tempFilePath.toString()); +connect.configureConnector(CONNECTOR_NAME, connectorConfigs); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, 1, +"Connector and task did not start in time"); + +verifyLinesInFile(tempFilePath, NUM_MESSAGES, true); + +connect.stopConnector(CONNECTOR_NAME); +connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, "Connector did not stop in time"); + +// Alter the offsets to cause the last message in the topic to be re-processed +Map partition = new HashMap<>(); +partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC); +partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0); +Map offset = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4); +List offsetsToAlter = Collections.singletonList(new ConnectorOffset(partition, offset)); + +connect.alterConnectorOffsets(CONNECTOR_NAME, new ConnectorOffsets(offsetsToAlter)); + +connect.resumeConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 1, +