[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1318853979


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_PREFIX = "Message ";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+connect.alterSinkConnectorOffset(CONNECTOR_NAME, new 
TopicPartition(TOPIC, 0), (long) (NUM_MESSAGES - 1));
+
+connect.resumeConnector(CONNECTOR_NAME);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not resume in time");
+
+// The last message should be re-processed when the connector is 
resumed after the offsets are altered
+verifyLinesInFile(tempFilePath, NUM_MESSAGES + 1, false);
+}
+
+@Test
+public void testResetOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = 

[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849972


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_FORMAT = "Message %d";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+Map offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+List offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));

Review Comment:
   Looks good, thanks 



##

[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-07 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1318849515


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_FORMAT = "Message %d";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+Map offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+List offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+connect.resumeConnector(CONNECTOR_NAME);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+  

[GitHub] [kafka] C0urante commented on a diff in pull request #14279: KAFKA-15179: Add integration tests for the file sink and source connectors

2023-09-05 Thread via GitHub


C0urante commented on code in PR #14279:
URL: https://github.com/apache/kafka/pull/14279#discussion_r1316170261


##
connect/file/src/test/java/org/apache/kafka/connect/file/integration/FileStreamSinkConnectorIntegrationTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.file.integration;
+
+import org.apache.kafka.connect.file.FileStreamSinkConnector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
+import org.apache.kafka.connect.util.SinkUtils;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.connect.file.FileStreamSinkConnector.FILE_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class FileStreamSinkConnectorIntegrationTest {
+
+private static final String CONNECTOR_NAME = "test-connector";
+private static final String TOPIC = "test-topic";
+private static final String MESSAGE_FORMAT = "Message %d";
+private static final int NUM_MESSAGES = 5;
+private static final String FILE_NAME = "test-file";
+private final EmbeddedConnectCluster connect = new 
EmbeddedConnectCluster.Builder().build();
+
+@BeforeEach
+public void setup() {
+connect.start();
+connect.kafka().createTopic(TOPIC);
+produceMessagesToTopic(TOPIC, NUM_MESSAGES);
+}
+
+@AfterEach
+public void tearDown() {
+connect.stop();
+}
+
+@Test
+public void testSimpleSink() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+}
+
+@Test
+public void testAlterOffsets() throws Exception {
+File tempDir = TestUtils.tempDirectory();
+Path tempFilePath = tempDir.toPath().resolve(FILE_NAME);
+Map connectorConfigs = baseConnectorConfigs(TOPIC, 
tempFilePath.toString());
+connect.configureConnector(CONNECTOR_NAME, connectorConfigs);
+
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 1,
+"Connector and task did not start in time");
+
+verifyLinesInFile(tempFilePath, NUM_MESSAGES, true);
+
+connect.stopConnector(CONNECTOR_NAME);
+connect.assertions().assertConnectorIsStopped(CONNECTOR_NAME, 
"Connector did not stop in time");
+
+// Alter the offsets to cause the last message in the topic to be 
re-processed
+Map partition = new HashMap<>();
+partition.put(SinkUtils.KAFKA_TOPIC_KEY, TOPIC);
+partition.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
+Map offset = 
Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 4);
+List offsetsToAlter = Collections.singletonList(new 
ConnectorOffset(partition, offset));
+
+connect.alterConnectorOffsets(CONNECTOR_NAME, new 
ConnectorOffsets(offsetsToAlter));
+
+connect.resumeConnector(CONNECTOR_NAME);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 1,
+