This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 101e14731c [Improve] Enhanced stability of Kudu E2E (#6258)
101e14731c is described below
commit 101e14731cfad6a46fc58c80f6604a5ef16b2231
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Fri Jan 19 22:07:37 2024 +0800
[Improve] Enhanced stability of Kudu E2E (#6258)
* [Improve] Enhanced stability of Kudu E2E
* [Improve] Enhanced stability of Kudu E2E
* [Improve] Enhanced stability of Kudu E2E
---
.../e2e/connector/kudu/KuduCDCSinkIT.java | 311 ------------------
.../seatunnel/e2e/connector/kudu/KuduIT.java | 220 ++++++++++++-
.../connector/kudu/KuduWIthMultipleTableIT.java | 363 ---------------------
.../resources/fake_to_kudu_with_multipletable.conf | 2 +-
.../kudu_to_assert_with_multipletable.conf | 2 +-
.../src/test/resources/kudu_to_console.conf | 3 +-
.../resources/write-cdc-changelog-to-kudu.conf | 4 +-
7 files changed, 222 insertions(+), 683 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
deleted file mode 100644
index 733e0d39c9..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.connector.kudu;
-
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnTypeAttributes;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.ToxiproxyContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InterfaceAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.lang.String.format;
-import static org.awaitility.Awaitility.await;
-
-@Slf4j
-@DisabledOnContainer(
- value = {},
- type = {EngineType.SPARK},
- disabledReason = "Currently SPARK do not support cdc")
-public class KuduCDCSinkIT extends TestSuiteBase implements TestResource {
-
- private static final String IMAGE = "apache/kudu:1.15.0";
- private static final Integer KUDU_MASTER_PORT = 7051;
- private static final Integer KUDU_TSERVER_PORT = 7053;
- private GenericContainer<?> master;
- private GenericContainer<?> tServers;
- private KuduClient kuduClient;
-
- private static final String TOXIPROXY_IMAGE =
"ghcr.io/shopify/toxiproxy:2.4.0";
- private static final String TOXIPROXY_NETWORK_ALIAS = "toxiproxy";
- private ToxiproxyContainer toxiProxy;
- private String KUDU_SINK_TABLE = "kudu_sink_table";
-
- @BeforeAll
- @Override
- public void startUp() throws Exception {
-
- String hostIP = getHostIPAddress();
-
- this.master =
- new GenericContainer<>(IMAGE)
- .withExposedPorts(KUDU_MASTER_PORT)
- .withCommand("master")
- .withEnv("MASTER_ARGS", "--default_num_replicas=1")
- .withNetwork(NETWORK)
- .withNetworkAliases("kudu-master-cdc")
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
- toxiProxy =
- new ToxiproxyContainer(TOXIPROXY_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(TOXIPROXY_NETWORK_ALIAS);
- toxiProxy.start();
-
- String instanceName = "kudu-tserver-cdc";
-
- ToxiproxyContainer.ContainerProxy proxy =
- toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
-
- this.tServers =
- new GenericContainer<>(IMAGE)
- .withExposedPorts(KUDU_TSERVER_PORT)
- .withCommand("tserver")
- .withEnv("KUDU_MASTERS", "kudu-master-cdc:" +
KUDU_MASTER_PORT)
- .withNetwork(NETWORK)
- .withNetworkAliases(instanceName)
- .dependsOn(master)
- .withEnv(
- "TSERVER_ARGS",
- format(
- "--fs_wal_dir=/var/lib/kudu/tserver
--logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s
--rpc_advertised_addresses=%s:%s",
- instanceName,
- KUDU_TSERVER_PORT,
- hostIP,
- proxy.getProxyPort()))
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
- Startables.deepStart(Stream.of(master)).join();
- Startables.deepStart(Stream.of(tServers)).join();
-
- Awaitility.given()
- .ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::getKuduClient);
- }
-
- private void initializeKuduTable() throws KuduException {
-
- List<ColumnSchema> columns = new ArrayList();
-
- columns.add(new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT32).key(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_bool",
Type.BOOL).nullable(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int8",
Type.INT8).nullable(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_decimal",
Type.DECIMAL)
- .nullable(true)
- .typeAttributes(
- new
ColumnTypeAttributes.ColumnTypeAttributesBuilder()
- .precision(20)
- .scale(5)
- .build())
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
- .nullable(true)
- .build());
- // spark
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros",
Type.UNIXTIME_MICROS)
- .nullable(true)
- .build());
-
- Schema schema = new Schema(columns);
-
- ImmutableList<String> hashKeys = ImmutableList.of("id");
- CreateTableOptions tableOptions = new CreateTableOptions();
-
- tableOptions.addHashPartitions(hashKeys, 2);
- tableOptions.setNumReplicas(1);
-
- kuduClient.createTable(KUDU_SINK_TABLE, schema, tableOptions);
- }
-
- private void getKuduClient() {
- kuduClient =
- new AsyncKuduClient.AsyncKuduClientBuilder(
- Arrays.asList(
- "127.0.0.1" + ":" +
master.getMappedPort(KUDU_MASTER_PORT)))
- .defaultAdminOperationTimeoutMs(120000)
- .defaultOperationTimeoutMs(120000)
- .build()
- .syncClient();
- }
-
- @TestTemplate
- public void testKudu(TestContainer container) throws IOException,
InterruptedException {
- this.initializeKuduTable();
- Container.ExecResult execResult =
container.executeJob("/write-cdc-changelog-to-kudu.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
-
- await().atMost(60000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> {
- Assertions.assertIterableEquals(
- Stream.<List<Object>>of(
- Arrays.asList(
- "3",
- "true",
- "1",
- "2",
- "3",
- "4",
- "4.3",
- "5.3",
- "6.30000",
- "NEW",
- "2020-02-02
02:02:02.0"),
- Arrays.asList(
- "1",
- "true",
- "2",
- "2",
- "3",
- "4",
- "4.3",
- "5.3",
- "6.30000",
- "NEW",
- "2020-02-02
02:02:02.0"))
- .collect(Collectors.toList()),
- readData(KUDU_SINK_TABLE));
- });
-
- kuduClient.deleteTable(KUDU_SINK_TABLE);
- }
-
- public List<List<Object>> readData(String tableName) throws KuduException {
- List<List<Object>> result = new ArrayList<>();
- KuduTable kuduTable = kuduClient.openTable(tableName);
- KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
- while (scanner.hasMoreRows()) {
- RowResultIterator rowResults = scanner.nextRows();
- List<Object> row = new ArrayList<>();
- while (rowResults.hasNext()) {
- RowResult rowResult = rowResults.next();
- for (int i = 0; i < rowResult.getSchema().getColumns().size();
i++) {
- row.add(rowResult.getObject(i).toString());
- }
- }
- result.add(row);
- }
- return result;
- }
-
- @Override
- @AfterAll
- public void tearDown() throws Exception {
- if (kuduClient != null) {
- kuduClient.close();
- }
-
- if (master != null) {
- master.close();
- }
-
- if (tServers != null) {
- tServers.close();
- }
- }
-
- private static String getHostIPAddress() {
- try {
- Enumeration<NetworkInterface> networkInterfaceEnumeration =
- NetworkInterface.getNetworkInterfaces();
- while (networkInterfaceEnumeration.hasMoreElements()) {
- for (InterfaceAddress interfaceAddress :
-
networkInterfaceEnumeration.nextElement().getInterfaceAddresses()) {
- if (interfaceAddress.getAddress().isSiteLocalAddress()
- && interfaceAddress.getAddress() instanceof
Inet4Address) {
- return interfaceAddress.getAddress().getHostAddress();
- }
- }
- }
- } catch (SocketException e) {
- throw new RuntimeException(e);
- }
- throw new IllegalStateException(
- "Could not find site local ipv4 address, failed to launch
kudu");
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index b0c8cba1fe..015ab0d3e3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.connector.kudu;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
@@ -27,6 +28,7 @@ import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.Bytes;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
@@ -65,6 +67,7 @@ import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
@@ -169,6 +172,27 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
}
}
+ private void batchInsertData(String tableName) throws KuduException {
+ KuduTable table = kuduClient.openTable(tableName);
+ KuduSession kuduSession = kuduClient.newSession();
+ for (int i = 0; i < 100; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addObject("id", i);
+ row.addObject("val_bool", true);
+ row.addObject("val_int8", (byte) 1);
+ row.addObject("val_int16", (short) 300);
+ row.addObject("val_int32", 30000);
+ row.addObject("val_int64", 30000000L);
+ row.addObject("val_float", 1.0f);
+ row.addObject("val_double", 2.0d);
+ row.addObject("val_decimal", new BigDecimal("1.1212"));
+ row.addObject("val_string", "test");
+ row.addObject("val_unixtime_micros", new
java.sql.Timestamp(1693477266998L));
+ OperationResponse response = kuduSession.apply(insert);
+ }
+ }
+
private void initializeKuduTable() throws KuduException {
List<ColumnSchema> columns = new ArrayList();
@@ -232,6 +256,64 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
kuduClient.createTable(KUDU_SINK_TABLE, schema, tableOptions);
}
+ private void initializeKuduTable(String tableName) throws KuduException {
+
+ List<ColumnSchema> columns = new ArrayList();
+
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT32).key(true).build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_bool",
Type.BOOL).nullable(true).build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_int8",
Type.INT8).nullable(true).build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16)
+ .nullable(true)
+ .build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32)
+ .nullable(true)
+ .build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64)
+ .nullable(true)
+ .build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT)
+ .nullable(true)
+ .build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE)
+ .nullable(true)
+ .build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_decimal",
Type.DECIMAL)
+ .nullable(true)
+ .typeAttributes(
+ new
ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+ .precision(20)
+ .scale(5)
+ .build())
+ .build());
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
+ .nullable(true)
+ .build());
+ // spark
+ columns.add(
+ new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros",
Type.UNIXTIME_MICROS)
+ .nullable(true)
+ .build());
+
+ Schema schema = new Schema(columns);
+
+ ImmutableList<String> hashKeys = ImmutableList.of("id");
+ CreateTableOptions tableOptions = new CreateTableOptions();
+
+ tableOptions.addHashPartitions(hashKeys, 2);
+ tableOptions.setNumReplicas(1);
+ kuduClient.createTable(tableName, schema, tableOptions);
+ }
+
private void getKuduClient() {
kuduClient =
new AsyncKuduClient.AsyncKuduClientBuilder(
@@ -260,15 +342,147 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
kuduClient.deleteTable(KUDU_SINK_TABLE);
}
- public List<String> readData(String tableName) throws KuduException {
- List<String> result = new ArrayList<>();
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
+ @TestTemplate
+ public void testCdcKudu(TestContainer container) throws IOException,
InterruptedException {
+ this.initializeKuduTable("kudu_cdc_sink_table");
+ Container.ExecResult execResult =
container.executeJob("/write-cdc-changelog-to-kudu.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+ Arrays.asList(
+ "3",
+ "true",
+ "1",
+ "2",
+ "3",
+ "4",
+ "4.3",
+ "5.3",
+ "6.30000",
+ "NEW",
+ "2020-02-02
02:02:02.0"),
+ Arrays.asList(
+ "1",
+ "true",
+ "2",
+ "2",
+ "3",
+ "4",
+ "4.3",
+ "5.3",
+ "6.30000",
+ "NEW",
+ "2020-02-02
02:02:02.0"))
+ .collect(Collectors.toList()),
+ readData("kudu_cdc_sink_table"));
+ });
+
+ kuduClient.deleteTable("kudu_cdc_sink_table");
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple
table read")
+ @TestTemplate
+ public void testKuduMultipleRead(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_source_table_1");
+ initializeKuduTable("kudu_source_table_2");
+ batchInsertData("kudu_source_table_1");
+ batchInsertData("kudu_source_table_2");
+ Container.ExecResult execResult =
+
container.executeJob("/kudu_to_assert_with_multipletable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ kuduClient.deleteTable("kudu_source_table_1");
+ kuduClient.deleteTable("kudu_source_table_2");
+ }
+
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Currently SPARK/FLINK do not support multiple
table read")
+ @TestTemplate
+ public void testKuduMultipleWrite(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_sink_1");
+ initializeKuduTable("kudu_sink_2");
+ Container.ExecResult execResult =
+ container.executeJob("/fake_to_kudu_with_multipletable.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertAll(
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+
Arrays.asList(
+
"1",
+
"true",
+
"1",
+
"2",
+
"3",
+
"4",
+
"4.3",
+
"5.3",
+
"6.30000",
+
"NEW",
+
"2020-02-02 02:02:02.0"))
+
.collect(Collectors.toList()),
+ readData("kudu_sink_1"));
+ },
+ () -> {
+ Assertions.assertIterableEquals(
+ Stream.<List<Object>>of(
+
Arrays.asList(
+
"1",
+
"true",
+
"1",
+
"2",
+
"3",
+
"4",
+
"4.3",
+
"5.3",
+
"6.30000",
+
"NEW",
+
"2020-02-02 02:02:02.0"))
+
.collect(Collectors.toList()),
+ readData("kudu_sink_2"));
+ }));
+
+ kuduClient.deleteTable("kudu_sink_1");
+ kuduClient.deleteTable("kudu_sink_2");
+ }
+
+ public List<List<Object>> readData(String tableName) throws KuduException {
+ List<List<Object>> result = new ArrayList<>();
KuduTable kuduTable = kuduClient.openTable(tableName);
KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
while (scanner.hasMoreRows()) {
RowResultIterator rowResults = scanner.nextRows();
+ List<Object> row = new ArrayList<>();
while (rowResults.hasNext()) {
RowResult rowResult = rowResults.next();
- result.add(rowResult.rowToString());
+ for (int i = 0; i < rowResult.getSchema().getColumns().size();
i++) {
+ if (rowResult.getSchema().getColumnByIndex(i).getType() ==
Type.BINARY) {
+ row.add(Bytes.pretty(rowResult.getBinaryCopy(i)));
+ break;
+ }
+ row.add(rowResult.getObject(i).toString());
+ }
+ }
+ if (!row.isEmpty()) {
+ result.add(row);
}
}
return result;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
deleted file mode 100644
index cc05653d5c..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.connector.kudu;
-
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnTypeAttributes;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.ToxiproxyContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.net.Inet4Address;
-import java.net.InterfaceAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.lang.String.format;
-import static org.awaitility.Awaitility.await;
-
-@Slf4j
-@DisabledOnContainer(
- value = {},
- type = {EngineType.SPARK, EngineType.FLINK},
- disabledReason = "Currently SPARK/FLINK do not support multiple table
read")
-public class KuduWIthMultipleTableIT extends TestSuiteBase implements
TestResource {
-
- private static final String IMAGE = "apache/kudu:1.15.0";
- private static final Integer KUDU_MASTER_PORT = 7051;
- private static final Integer KUDU_TSERVER_PORT = 7054;
- private GenericContainer<?> master;
- private GenericContainer<?> tServers;
- private KuduClient kuduClient;
-
- private static final String TOXIPROXY_IMAGE =
"ghcr.io/shopify/toxiproxy:2.4.0";
- private static final String TOXIPROXY_NETWORK_ALIAS = "toxiproxy";
- private ToxiproxyContainer toxiProxy;
-
- @BeforeAll
- @Override
- public void startUp() throws Exception {
-
- String hostIP = getHostIPAddress();
-
- this.master =
- new GenericContainer<>(IMAGE)
- .withExposedPorts(KUDU_MASTER_PORT)
- .withCommand("master")
- .withEnv("MASTER_ARGS", "--default_num_replicas=1")
- .withNetwork(NETWORK)
- .withNetworkAliases("kudu-master-multiple")
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
- toxiProxy =
- new ToxiproxyContainer(TOXIPROXY_IMAGE)
- .withNetwork(NETWORK)
- .withNetworkAliases(TOXIPROXY_NETWORK_ALIAS);
- toxiProxy.start();
-
- String instanceName = "kudu-tserver-multiple";
-
- ToxiproxyContainer.ContainerProxy proxy =
- toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
-
- this.tServers =
- new GenericContainer<>(IMAGE)
- .withExposedPorts(KUDU_TSERVER_PORT)
- .withCommand("tserver")
- .withEnv("KUDU_MASTERS", "kudu-master-multiple:" +
KUDU_MASTER_PORT)
- .withNetwork(NETWORK)
- .withNetworkAliases(instanceName)
- .dependsOn(master)
- .withEnv(
- "TSERVER_ARGS",
- format(
- "--fs_wal_dir=/var/lib/kudu/tserver
--logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s
--rpc_advertised_addresses=%s:%s",
- instanceName,
- KUDU_TSERVER_PORT,
- hostIP,
- proxy.getProxyPort()))
- .withLogConsumer(
- new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
- Startables.deepStart(Stream.of(master)).join();
- Startables.deepStart(Stream.of(tServers)).join();
-
- Awaitility.given()
- .ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::getKuduClient);
- }
-
- private void batchInsertData(String tableName) throws KuduException {
- KuduTable table = kuduClient.openTable(tableName);
- KuduSession kuduSession = kuduClient.newSession();
- for (int i = 0; i < 100; i++) {
- Insert insert = table.newInsert();
- PartialRow row = insert.getRow();
- row.addObject("id", i);
- row.addObject("val_bool", true);
- row.addObject("val_int8", (byte) 1);
- row.addObject("val_int16", (short) 300);
- row.addObject("val_int32", 30000);
- row.addObject("val_int64", 30000000L);
- row.addObject("val_float", 1.0f);
- row.addObject("val_double", 2.0d);
- row.addObject("val_decimal", new BigDecimal("1.1212"));
- row.addObject("val_string", "test");
- row.addObject("val_unixtime_micros", new
java.sql.Timestamp(1693477266998L));
- OperationResponse response = kuduSession.apply(insert);
- }
- }
-
- private void initializeKuduTable(String tableName) throws KuduException {
-
- List<ColumnSchema> columns = new ArrayList();
-
- columns.add(new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT32).key(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_bool",
Type.BOOL).nullable(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int8",
Type.INT8).nullable(true).build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE)
- .nullable(true)
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_decimal",
Type.DECIMAL)
- .nullable(true)
- .typeAttributes(
- new
ColumnTypeAttributes.ColumnTypeAttributesBuilder()
- .precision(20)
- .scale(5)
- .build())
- .build());
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
- .nullable(true)
- .build());
- // spark
- columns.add(
- new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros",
Type.UNIXTIME_MICROS)
- .nullable(true)
- .build());
-
- Schema schema = new Schema(columns);
-
- ImmutableList<String> hashKeys = ImmutableList.of("id");
- CreateTableOptions tableOptions = new CreateTableOptions();
-
- tableOptions.addHashPartitions(hashKeys, 2);
- tableOptions.setNumReplicas(1);
- kuduClient.createTable(tableName, schema, tableOptions);
- }
-
- private void getKuduClient() {
- kuduClient =
- new AsyncKuduClient.AsyncKuduClientBuilder(
- Arrays.asList(
- "127.0.0.1" + ":" +
master.getMappedPort(KUDU_MASTER_PORT)))
- .defaultAdminOperationTimeoutMs(120000)
- .defaultOperationTimeoutMs(120000)
- .build()
- .syncClient();
- }
-
- @TestTemplate
- public void testKuduMultipleRead(TestContainer container)
- throws IOException, InterruptedException {
- initializeKuduTable("kudu_source_table_1");
- initializeKuduTable("kudu_source_table_2");
- batchInsertData("kudu_source_table_1");
- batchInsertData("kudu_source_table_2");
- Container.ExecResult execResult =
-
container.executeJob("/kudu_to_assert_with_multipletable.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- kuduClient.deleteTable("kudu_source_table_1");
- kuduClient.deleteTable("kudu_source_table_2");
- }
-
- @TestTemplate
- public void testKuduMultipleWrite(TestContainer container)
- throws IOException, InterruptedException {
- initializeKuduTable("kudu_sink_1");
- initializeKuduTable("kudu_sink_2");
- Container.ExecResult execResult =
- container.executeJob("/fake_to_kudu_with_multipletable.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
-
- await().atMost(60000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () ->
- Assertions.assertAll(
- () -> {
- Assertions.assertIterableEquals(
- Stream.<List<Object>>of(
-
Arrays.asList(
-
"1",
-
"true",
-
"1",
-
"2",
-
"3",
-
"4",
-
"4.3",
-
"5.3",
-
"6.30000",
-
"NEW",
-
"2020-02-02 02:02:02.0"))
-
.collect(Collectors.toList()),
- readData("kudu_sink_1"));
- },
- () -> {
- Assertions.assertIterableEquals(
- Stream.<List<Object>>of(
-
Arrays.asList(
-
"1",
-
"true",
-
"1",
-
"2",
-
"3",
-
"4",
-
"4.3",
-
"5.3",
-
"6.30000",
-
"NEW",
-
"2020-02-02 02:02:02.0"))
-
.collect(Collectors.toList()),
- readData("kudu_sink_2"));
- }));
-
- kuduClient.deleteTable("kudu_sink_1");
- kuduClient.deleteTable("kudu_sink_2");
- }
-
- @Override
- @AfterAll
- public void tearDown() throws Exception {
- if (kuduClient != null) {
- kuduClient.close();
- }
-
- if (master != null) {
- master.close();
- }
-
- if (tServers != null) {
- tServers.close();
- }
- }
-
- public List<List<Object>> readData(String tableName) throws KuduException {
- List<List<Object>> result = new ArrayList<>();
- KuduTable kuduTable = kuduClient.openTable(tableName);
- KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
- while (scanner.hasMoreRows()) {
- RowResultIterator rowResults = scanner.nextRows();
- List<Object> row = new ArrayList<>();
- while (rowResults.hasNext()) {
- RowResult rowResult = rowResults.next();
- for (int i = 0; i < rowResult.getSchema().getColumns().size();
i++) {
- row.add(rowResult.getObject(i).toString());
- }
- }
- if (!row.isEmpty()) {
- result.add(row);
- }
- }
- return result;
- }
-
- private static String getHostIPAddress() {
- try {
- Enumeration<NetworkInterface> networkInterfaceEnumeration =
- NetworkInterface.getNetworkInterfaces();
- while (networkInterfaceEnumeration.hasMoreElements()) {
- for (InterfaceAddress interfaceAddress :
-
networkInterfaceEnumeration.nextElement().getInterfaceAddresses()) {
- if (interfaceAddress.getAddress().isSiteLocalAddress()
- && interfaceAddress.getAddress() instanceof
Inet4Address) {
- return interfaceAddress.getAddress().getHostAddress();
- }
- }
- }
- } catch (SocketException e) {
- throw new RuntimeException(e);
- }
- throw new IllegalStateException(
- "Could not find site local ipv4 address, failed to launch
kudu");
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
index da239d00b2..f9b3a2ffe3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
@@ -82,6 +82,6 @@ source {
sink {
kudu{
- kudu_masters = "kudu-master-multiple:7051"
+ kudu_masters = "kudu-master:7051"
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
index b378810601..2a0007d0a2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
@@ -27,7 +27,7 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
kudu{
- kudu_masters = "kudu-master-multiple:7051"
+ kudu_masters = "kudu-master:7051"
table_list = [
{
table_name = "kudu_source_table_1"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
index 33697c11af..a7513760ae 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
@@ -21,8 +21,7 @@
env {
# You can set engine configuration here
parallelism = 1
- job.mode = "STREAMING"
- checkpoint.interval = 5000
+ job.mode = "BATCH"
}
source {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
index e6b799136c..c3aea6c857 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
@@ -68,7 +68,7 @@ source {
sink {
kudu{
- kudu_masters = "kudu-master-cdc:7051"
- table_name = "kudu_sink_table"
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_cdc_sink_table"
}
}
\ No newline at end of file