This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 101e14731c [Improve] Enhanced stability of Kudu E2E (#6258)
101e14731c is described below

commit 101e14731cfad6a46fc58c80f6604a5ef16b2231
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Fri Jan 19 22:07:37 2024 +0800

    [Improve] Enhanced stability of Kudu E2E (#6258)
    
    * [Improve] Enhanced stability of Kudu E2E
    
    * [Improve] Enhanced stability of Kudu E2E
    
    * [Improve] Enhanced stability of Kudu E2E
---
 .../e2e/connector/kudu/KuduCDCSinkIT.java          | 311 ------------------
 .../seatunnel/e2e/connector/kudu/KuduIT.java       | 220 ++++++++++++-
 .../connector/kudu/KuduWIthMultipleTableIT.java    | 363 ---------------------
 .../resources/fake_to_kudu_with_multipletable.conf |   2 +-
 .../kudu_to_assert_with_multipletable.conf         |   2 +-
 .../src/test/resources/kudu_to_console.conf        |   3 +-
 .../resources/write-cdc-changelog-to-kudu.conf     |   4 +-
 7 files changed, 222 insertions(+), 683 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
deleted file mode 100644
index 733e0d39c9..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.connector.kudu;
-
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnTypeAttributes;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.ToxiproxyContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InterfaceAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.lang.String.format;
-import static org.awaitility.Awaitility.await;
-
-@Slf4j
-@DisabledOnContainer(
-        value = {},
-        type = {EngineType.SPARK},
-        disabledReason = "Currently SPARK do not support cdc")
-public class KuduCDCSinkIT extends TestSuiteBase implements TestResource {
-
-    private static final String IMAGE = "apache/kudu:1.15.0";
-    private static final Integer KUDU_MASTER_PORT = 7051;
-    private static final Integer KUDU_TSERVER_PORT = 7053;
-    private GenericContainer<?> master;
-    private GenericContainer<?> tServers;
-    private KuduClient kuduClient;
-
-    private static final String TOXIPROXY_IMAGE = 
"ghcr.io/shopify/toxiproxy:2.4.0";
-    private static final String TOXIPROXY_NETWORK_ALIAS = "toxiproxy";
-    private ToxiproxyContainer toxiProxy;
-    private String KUDU_SINK_TABLE = "kudu_sink_table";
-
-    @BeforeAll
-    @Override
-    public void startUp() throws Exception {
-
-        String hostIP = getHostIPAddress();
-
-        this.master =
-                new GenericContainer<>(IMAGE)
-                        .withExposedPorts(KUDU_MASTER_PORT)
-                        .withCommand("master")
-                        .withEnv("MASTER_ARGS", "--default_num_replicas=1")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases("kudu-master-cdc")
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
-        toxiProxy =
-                new ToxiproxyContainer(TOXIPROXY_IMAGE)
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(TOXIPROXY_NETWORK_ALIAS);
-        toxiProxy.start();
-
-        String instanceName = "kudu-tserver-cdc";
-
-        ToxiproxyContainer.ContainerProxy proxy =
-                toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
-
-        this.tServers =
-                new GenericContainer<>(IMAGE)
-                        .withExposedPorts(KUDU_TSERVER_PORT)
-                        .withCommand("tserver")
-                        .withEnv("KUDU_MASTERS", "kudu-master-cdc:" + 
KUDU_MASTER_PORT)
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(instanceName)
-                        .dependsOn(master)
-                        .withEnv(
-                                "TSERVER_ARGS",
-                                format(
-                                        "--fs_wal_dir=/var/lib/kudu/tserver 
--logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s 
--rpc_advertised_addresses=%s:%s",
-                                        instanceName,
-                                        KUDU_TSERVER_PORT,
-                                        hostIP,
-                                        proxy.getProxyPort()))
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
-        Startables.deepStart(Stream.of(master)).join();
-        Startables.deepStart(Stream.of(tServers)).join();
-
-        Awaitility.given()
-                .ignoreExceptions()
-                .atLeast(100, TimeUnit.MILLISECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(this::getKuduClient);
-    }
-
-    private void initializeKuduTable() throws KuduException {
-
-        List<ColumnSchema> columns = new ArrayList();
-
-        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_bool", 
Type.BOOL).nullable(true).build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int8", 
Type.INT8).nullable(true).build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_decimal", 
Type.DECIMAL)
-                        .nullable(true)
-                        .typeAttributes(
-                                new 
ColumnTypeAttributes.ColumnTypeAttributesBuilder()
-                                        .precision(20)
-                                        .scale(5)
-                                        .build())
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
-                        .nullable(true)
-                        .build());
-        // spark
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", 
Type.UNIXTIME_MICROS)
-                        .nullable(true)
-                        .build());
-
-        Schema schema = new Schema(columns);
-
-        ImmutableList<String> hashKeys = ImmutableList.of("id");
-        CreateTableOptions tableOptions = new CreateTableOptions();
-
-        tableOptions.addHashPartitions(hashKeys, 2);
-        tableOptions.setNumReplicas(1);
-
-        kuduClient.createTable(KUDU_SINK_TABLE, schema, tableOptions);
-    }
-
-    private void getKuduClient() {
-        kuduClient =
-                new AsyncKuduClient.AsyncKuduClientBuilder(
-                                Arrays.asList(
-                                        "127.0.0.1" + ":" + 
master.getMappedPort(KUDU_MASTER_PORT)))
-                        .defaultAdminOperationTimeoutMs(120000)
-                        .defaultOperationTimeoutMs(120000)
-                        .build()
-                        .syncClient();
-    }
-
-    @TestTemplate
-    public void testKudu(TestContainer container) throws IOException, 
InterruptedException {
-        this.initializeKuduTable();
-        Container.ExecResult execResult = 
container.executeJob("/write-cdc-changelog-to-kudu.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            Assertions.assertIterableEquals(
-                                    Stream.<List<Object>>of(
-                                                    Arrays.asList(
-                                                            "3",
-                                                            "true",
-                                                            "1",
-                                                            "2",
-                                                            "3",
-                                                            "4",
-                                                            "4.3",
-                                                            "5.3",
-                                                            "6.30000",
-                                                            "NEW",
-                                                            "2020-02-02 
02:02:02.0"),
-                                                    Arrays.asList(
-                                                            "1",
-                                                            "true",
-                                                            "2",
-                                                            "2",
-                                                            "3",
-                                                            "4",
-                                                            "4.3",
-                                                            "5.3",
-                                                            "6.30000",
-                                                            "NEW",
-                                                            "2020-02-02 
02:02:02.0"))
-                                            .collect(Collectors.toList()),
-                                    readData(KUDU_SINK_TABLE));
-                        });
-
-        kuduClient.deleteTable(KUDU_SINK_TABLE);
-    }
-
-    public List<List<Object>> readData(String tableName) throws KuduException {
-        List<List<Object>> result = new ArrayList<>();
-        KuduTable kuduTable = kuduClient.openTable(tableName);
-        KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
-        while (scanner.hasMoreRows()) {
-            RowResultIterator rowResults = scanner.nextRows();
-            List<Object> row = new ArrayList<>();
-            while (rowResults.hasNext()) {
-                RowResult rowResult = rowResults.next();
-                for (int i = 0; i < rowResult.getSchema().getColumns().size(); 
i++) {
-                    row.add(rowResult.getObject(i).toString());
-                }
-            }
-            result.add(row);
-        }
-        return result;
-    }
-
-    @Override
-    @AfterAll
-    public void tearDown() throws Exception {
-        if (kuduClient != null) {
-            kuduClient.close();
-        }
-
-        if (master != null) {
-            master.close();
-        }
-
-        if (tServers != null) {
-            tServers.close();
-        }
-    }
-
-    private static String getHostIPAddress() {
-        try {
-            Enumeration<NetworkInterface> networkInterfaceEnumeration =
-                    NetworkInterface.getNetworkInterfaces();
-            while (networkInterfaceEnumeration.hasMoreElements()) {
-                for (InterfaceAddress interfaceAddress :
-                        
networkInterfaceEnumeration.nextElement().getInterfaceAddresses()) {
-                    if (interfaceAddress.getAddress().isSiteLocalAddress()
-                            && interfaceAddress.getAddress() instanceof 
Inet4Address) {
-                        return interfaceAddress.getAddress().getHostAddress();
-                    }
-                }
-            }
-        } catch (SocketException e) {
-            throw new RuntimeException(e);
-        }
-        throw new IllegalStateException(
-                "Could not find site local ipv4 address, failed to launch 
kudu");
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index b0c8cba1fe..015ab0d3e3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.connector.kudu;
 
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
@@ -27,6 +28,7 @@ import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.Bytes;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.KuduClient;
@@ -65,6 +67,7 @@ import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
@@ -169,6 +172,27 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    private void batchInsertData(String tableName) throws KuduException {
+        KuduTable table = kuduClient.openTable(tableName);
+        KuduSession kuduSession = kuduClient.newSession();
+        for (int i = 0; i < 100; i++) {
+            Insert insert = table.newInsert();
+            PartialRow row = insert.getRow();
+            row.addObject("id", i);
+            row.addObject("val_bool", true);
+            row.addObject("val_int8", (byte) 1);
+            row.addObject("val_int16", (short) 300);
+            row.addObject("val_int32", 30000);
+            row.addObject("val_int64", 30000000L);
+            row.addObject("val_float", 1.0f);
+            row.addObject("val_double", 2.0d);
+            row.addObject("val_decimal", new BigDecimal("1.1212"));
+            row.addObject("val_string", "test");
+            row.addObject("val_unixtime_micros", new 
java.sql.Timestamp(1693477266998L));
+            OperationResponse response = kuduSession.apply(insert);
+        }
+    }
+
     private void initializeKuduTable() throws KuduException {
 
         List<ColumnSchema> columns = new ArrayList();
@@ -232,6 +256,64 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
         kuduClient.createTable(KUDU_SINK_TABLE, schema, tableOptions);
     }
 
+    private void initializeKuduTable(String tableName) throws KuduException {
+
+        List<ColumnSchema> columns = new ArrayList();
+
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_bool", 
Type.BOOL).nullable(true).build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_int8", 
Type.INT8).nullable(true).build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16)
+                        .nullable(true)
+                        .build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32)
+                        .nullable(true)
+                        .build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64)
+                        .nullable(true)
+                        .build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT)
+                        .nullable(true)
+                        .build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE)
+                        .nullable(true)
+                        .build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_decimal", 
Type.DECIMAL)
+                        .nullable(true)
+                        .typeAttributes(
+                                new 
ColumnTypeAttributes.ColumnTypeAttributesBuilder()
+                                        .precision(20)
+                                        .scale(5)
+                                        .build())
+                        .build());
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
+                        .nullable(true)
+                        .build());
+        // spark
+        columns.add(
+                new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", 
Type.UNIXTIME_MICROS)
+                        .nullable(true)
+                        .build());
+
+        Schema schema = new Schema(columns);
+
+        ImmutableList<String> hashKeys = ImmutableList.of("id");
+        CreateTableOptions tableOptions = new CreateTableOptions();
+
+        tableOptions.addHashPartitions(hashKeys, 2);
+        tableOptions.setNumReplicas(1);
+        kuduClient.createTable(tableName, schema, tableOptions);
+    }
+
     private void getKuduClient() {
         kuduClient =
                 new AsyncKuduClient.AsyncKuduClientBuilder(
@@ -260,15 +342,147 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
         kuduClient.deleteTable(KUDU_SINK_TABLE);
     }
 
-    public List<String> readData(String tableName) throws KuduException {
-        List<String> result = new ArrayList<>();
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
+    @TestTemplate
+    public void testCdcKudu(TestContainer container) throws IOException, 
InterruptedException {
+        this.initializeKuduTable("kudu_cdc_sink_table");
+        Container.ExecResult execResult = 
container.executeJob("/write-cdc-changelog-to-kudu.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertIterableEquals(
+                                    Stream.<List<Object>>of(
+                                                    Arrays.asList(
+                                                            "3",
+                                                            "true",
+                                                            "1",
+                                                            "2",
+                                                            "3",
+                                                            "4",
+                                                            "4.3",
+                                                            "5.3",
+                                                            "6.30000",
+                                                            "NEW",
+                                                            "2020-02-02 
02:02:02.0"),
+                                                    Arrays.asList(
+                                                            "1",
+                                                            "true",
+                                                            "2",
+                                                            "2",
+                                                            "3",
+                                                            "4",
+                                                            "4.3",
+                                                            "5.3",
+                                                            "6.30000",
+                                                            "NEW",
+                                                            "2020-02-02 
02:02:02.0"))
+                                            .collect(Collectors.toList()),
+                                    readData("kudu_cdc_sink_table"));
+                        });
+
+        kuduClient.deleteTable("kudu_cdc_sink_table");
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple 
table read")
+    @TestTemplate
+    public void testKuduMultipleRead(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_source_table_1");
+        initializeKuduTable("kudu_source_table_2");
+        batchInsertData("kudu_source_table_1");
+        batchInsertData("kudu_source_table_2");
+        Container.ExecResult execResult =
+                
container.executeJob("/kudu_to_assert_with_multipletable.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        kuduClient.deleteTable("kudu_source_table_1");
+        kuduClient.deleteTable("kudu_source_table_2");
+    }
+
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple 
table read")
+    @TestTemplate
+    public void testKuduMultipleWrite(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_sink_1");
+        initializeKuduTable("kudu_sink_2");
+        Container.ExecResult execResult =
+                container.executeJob("/fake_to_kudu_with_multipletable.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertAll(
+                                        () -> {
+                                            Assertions.assertIterableEquals(
+                                                    Stream.<List<Object>>of(
+                                                                    
Arrays.asList(
+                                                                            
"1",
+                                                                            
"true",
+                                                                            
"1",
+                                                                            
"2",
+                                                                            
"3",
+                                                                            
"4",
+                                                                            
"4.3",
+                                                                            
"5.3",
+                                                                            
"6.30000",
+                                                                            
"NEW",
+                                                                            
"2020-02-02 02:02:02.0"))
+                                                            
.collect(Collectors.toList()),
+                                                    readData("kudu_sink_1"));
+                                        },
+                                        () -> {
+                                            Assertions.assertIterableEquals(
+                                                    Stream.<List<Object>>of(
+                                                                    
Arrays.asList(
+                                                                            
"1",
+                                                                            
"true",
+                                                                            
"1",
+                                                                            
"2",
+                                                                            
"3",
+                                                                            
"4",
+                                                                            
"4.3",
+                                                                            
"5.3",
+                                                                            
"6.30000",
+                                                                            
"NEW",
+                                                                            
"2020-02-02 02:02:02.0"))
+                                                            
.collect(Collectors.toList()),
+                                                    readData("kudu_sink_2"));
+                                        }));
+
+        kuduClient.deleteTable("kudu_sink_1");
+        kuduClient.deleteTable("kudu_sink_2");
+    }
+
+    public List<List<Object>> readData(String tableName) throws KuduException {
+        List<List<Object>> result = new ArrayList<>();
         KuduTable kuduTable = kuduClient.openTable(tableName);
         KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
         while (scanner.hasMoreRows()) {
             RowResultIterator rowResults = scanner.nextRows();
+            List<Object> row = new ArrayList<>();
             while (rowResults.hasNext()) {
                 RowResult rowResult = rowResults.next();
-                result.add(rowResult.rowToString());
+                for (int i = 0; i < rowResult.getSchema().getColumns().size(); 
i++) {
+                    if (rowResult.getSchema().getColumnByIndex(i).getType() == 
Type.BINARY) {
+                        row.add(Bytes.pretty(rowResult.getBinaryCopy(i)));
+                        break;
+                    }
+                    row.add(rowResult.getObject(i).toString());
+                }
+            }
+            if (!row.isEmpty()) {
+                result.add(row);
             }
         }
         return result;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
deleted file mode 100644
index cc05653d5c..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduWIthMultipleTableIT.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.connector.kudu;
-
-import org.apache.seatunnel.e2e.common.TestResource;
-import org.apache.seatunnel.e2e.common.TestSuiteBase;
-import org.apache.seatunnel.e2e.common.container.EngineType;
-import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnTypeAttributes;
-import org.apache.kudu.Schema;
-import org.apache.kudu.Type;
-import org.apache.kudu.client.AsyncKuduClient;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
-
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.TestTemplate;
-import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.ToxiproxyContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.net.Inet4Address;
-import java.net.InterfaceAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.lang.String.format;
-import static org.awaitility.Awaitility.await;
-
-@Slf4j
-@DisabledOnContainer(
-        value = {},
-        type = {EngineType.SPARK, EngineType.FLINK},
-        disabledReason = "Currently SPARK/FLINK do not support multiple table 
read")
-public class KuduWIthMultipleTableIT extends TestSuiteBase implements 
TestResource {
-
-    private static final String IMAGE = "apache/kudu:1.15.0";
-    private static final Integer KUDU_MASTER_PORT = 7051;
-    private static final Integer KUDU_TSERVER_PORT = 7054;
-    private GenericContainer<?> master;
-    private GenericContainer<?> tServers;
-    private KuduClient kuduClient;
-
-    private static final String TOXIPROXY_IMAGE = 
"ghcr.io/shopify/toxiproxy:2.4.0";
-    private static final String TOXIPROXY_NETWORK_ALIAS = "toxiproxy";
-    private ToxiproxyContainer toxiProxy;
-
-    @BeforeAll
-    @Override
-    public void startUp() throws Exception {
-
-        String hostIP = getHostIPAddress();
-
-        this.master =
-                new GenericContainer<>(IMAGE)
-                        .withExposedPorts(KUDU_MASTER_PORT)
-                        .withCommand("master")
-                        .withEnv("MASTER_ARGS", "--default_num_replicas=1")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases("kudu-master-multiple")
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
-        toxiProxy =
-                new ToxiproxyContainer(TOXIPROXY_IMAGE)
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(TOXIPROXY_NETWORK_ALIAS);
-        toxiProxy.start();
-
-        String instanceName = "kudu-tserver-multiple";
-
-        ToxiproxyContainer.ContainerProxy proxy =
-                toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
-
-        this.tServers =
-                new GenericContainer<>(IMAGE)
-                        .withExposedPorts(KUDU_TSERVER_PORT)
-                        .withCommand("tserver")
-                        .withEnv("KUDU_MASTERS", "kudu-master-multiple:" + 
KUDU_MASTER_PORT)
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(instanceName)
-                        .dependsOn(master)
-                        .withEnv(
-                                "TSERVER_ARGS",
-                                format(
-                                        "--fs_wal_dir=/var/lib/kudu/tserver 
--logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s 
--rpc_advertised_addresses=%s:%s",
-                                        instanceName,
-                                        KUDU_TSERVER_PORT,
-                                        hostIP,
-                                        proxy.getProxyPort()))
-                        .withLogConsumer(
-                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)));
-
-        Startables.deepStart(Stream.of(master)).join();
-        Startables.deepStart(Stream.of(tServers)).join();
-
-        Awaitility.given()
-                .ignoreExceptions()
-                .atLeast(100, TimeUnit.MILLISECONDS)
-                .pollInterval(500, TimeUnit.MILLISECONDS)
-                .atMost(180, TimeUnit.SECONDS)
-                .untilAsserted(this::getKuduClient);
-    }
-
-    private void batchInsertData(String tableName) throws KuduException {
-        KuduTable table = kuduClient.openTable(tableName);
-        KuduSession kuduSession = kuduClient.newSession();
-        for (int i = 0; i < 100; i++) {
-            Insert insert = table.newInsert();
-            PartialRow row = insert.getRow();
-            row.addObject("id", i);
-            row.addObject("val_bool", true);
-            row.addObject("val_int8", (byte) 1);
-            row.addObject("val_int16", (short) 300);
-            row.addObject("val_int32", 30000);
-            row.addObject("val_int64", 30000000L);
-            row.addObject("val_float", 1.0f);
-            row.addObject("val_double", 2.0d);
-            row.addObject("val_decimal", new BigDecimal("1.1212"));
-            row.addObject("val_string", "test");
-            row.addObject("val_unixtime_micros", new 
java.sql.Timestamp(1693477266998L));
-            OperationResponse response = kuduSession.apply(insert);
-        }
-    }
-
-    private void initializeKuduTable(String tableName) throws KuduException {
-
-        List<ColumnSchema> columns = new ArrayList();
-
-        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_bool", 
Type.BOOL).nullable(true).build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int8", 
Type.INT8).nullable(true).build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE)
-                        .nullable(true)
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_decimal", 
Type.DECIMAL)
-                        .nullable(true)
-                        .typeAttributes(
-                                new 
ColumnTypeAttributes.ColumnTypeAttributesBuilder()
-                                        .precision(20)
-                                        .scale(5)
-                                        .build())
-                        .build());
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
-                        .nullable(true)
-                        .build());
-        // spark
-        columns.add(
-                new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", 
Type.UNIXTIME_MICROS)
-                        .nullable(true)
-                        .build());
-
-        Schema schema = new Schema(columns);
-
-        ImmutableList<String> hashKeys = ImmutableList.of("id");
-        CreateTableOptions tableOptions = new CreateTableOptions();
-
-        tableOptions.addHashPartitions(hashKeys, 2);
-        tableOptions.setNumReplicas(1);
-        kuduClient.createTable(tableName, schema, tableOptions);
-    }
-
-    private void getKuduClient() {
-        kuduClient =
-                new AsyncKuduClient.AsyncKuduClientBuilder(
-                                Arrays.asList(
-                                        "127.0.0.1" + ":" + 
master.getMappedPort(KUDU_MASTER_PORT)))
-                        .defaultAdminOperationTimeoutMs(120000)
-                        .defaultOperationTimeoutMs(120000)
-                        .build()
-                        .syncClient();
-    }
-
-    @TestTemplate
-    public void testKuduMultipleRead(TestContainer container)
-            throws IOException, InterruptedException {
-        initializeKuduTable("kudu_source_table_1");
-        initializeKuduTable("kudu_source_table_2");
-        batchInsertData("kudu_source_table_1");
-        batchInsertData("kudu_source_table_2");
-        Container.ExecResult execResult =
-                
container.executeJob("/kudu_to_assert_with_multipletable.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        kuduClient.deleteTable("kudu_source_table_1");
-        kuduClient.deleteTable("kudu_source_table_2");
-    }
-
-    @TestTemplate
-    public void testKuduMultipleWrite(TestContainer container)
-            throws IOException, InterruptedException {
-        initializeKuduTable("kudu_sink_1");
-        initializeKuduTable("kudu_sink_2");
-        Container.ExecResult execResult =
-                container.executeJob("/fake_to_kudu_with_multipletable.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () ->
-                                Assertions.assertAll(
-                                        () -> {
-                                            Assertions.assertIterableEquals(
-                                                    Stream.<List<Object>>of(
-                                                                    
Arrays.asList(
-                                                                            
"1",
-                                                                            
"true",
-                                                                            
"1",
-                                                                            
"2",
-                                                                            
"3",
-                                                                            
"4",
-                                                                            
"4.3",
-                                                                            
"5.3",
-                                                                            
"6.30000",
-                                                                            
"NEW",
-                                                                            
"2020-02-02 02:02:02.0"))
-                                                            
.collect(Collectors.toList()),
-                                                    readData("kudu_sink_1"));
-                                        },
-                                        () -> {
-                                            Assertions.assertIterableEquals(
-                                                    Stream.<List<Object>>of(
-                                                                    
Arrays.asList(
-                                                                            
"1",
-                                                                            
"true",
-                                                                            
"1",
-                                                                            
"2",
-                                                                            
"3",
-                                                                            
"4",
-                                                                            
"4.3",
-                                                                            
"5.3",
-                                                                            
"6.30000",
-                                                                            
"NEW",
-                                                                            
"2020-02-02 02:02:02.0"))
-                                                            
.collect(Collectors.toList()),
-                                                    readData("kudu_sink_2"));
-                                        }));
-
-        kuduClient.deleteTable("kudu_sink_1");
-        kuduClient.deleteTable("kudu_sink_2");
-    }
-
-    @Override
-    @AfterAll
-    public void tearDown() throws Exception {
-        if (kuduClient != null) {
-            kuduClient.close();
-        }
-
-        if (master != null) {
-            master.close();
-        }
-
-        if (tServers != null) {
-            tServers.close();
-        }
-    }
-
-    public List<List<Object>> readData(String tableName) throws KuduException {
-        List<List<Object>> result = new ArrayList<>();
-        KuduTable kuduTable = kuduClient.openTable(tableName);
-        KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build();
-        while (scanner.hasMoreRows()) {
-            RowResultIterator rowResults = scanner.nextRows();
-            List<Object> row = new ArrayList<>();
-            while (rowResults.hasNext()) {
-                RowResult rowResult = rowResults.next();
-                for (int i = 0; i < rowResult.getSchema().getColumns().size(); 
i++) {
-                    row.add(rowResult.getObject(i).toString());
-                }
-            }
-            if (!row.isEmpty()) {
-                result.add(row);
-            }
-        }
-        return result;
-    }
-
-    private static String getHostIPAddress() {
-        try {
-            Enumeration<NetworkInterface> networkInterfaceEnumeration =
-                    NetworkInterface.getNetworkInterfaces();
-            while (networkInterfaceEnumeration.hasMoreElements()) {
-                for (InterfaceAddress interfaceAddress :
-                        
networkInterfaceEnumeration.nextElement().getInterfaceAddresses()) {
-                    if (interfaceAddress.getAddress().isSiteLocalAddress()
-                            && interfaceAddress.getAddress() instanceof 
Inet4Address) {
-                        return interfaceAddress.getAddress().getHostAddress();
-                    }
-                }
-            }
-        } catch (SocketException e) {
-            throw new RuntimeException(e);
-        }
-        throw new IllegalStateException(
-                "Could not find site local ipv4 address, failed to launch 
kudu");
-    }
-}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
index da239d00b2..f9b3a2ffe3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/fake_to_kudu_with_multipletable.conf
@@ -82,6 +82,6 @@ source {
 
 sink {
    kudu{
-    kudu_masters = "kudu-master-multiple:7051"
+    kudu_masters = "kudu-master:7051"
  }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
index b378810601..2a0007d0a2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_multipletable.conf
@@ -27,7 +27,7 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
   kudu{
-   kudu_masters = "kudu-master-multiple:7051"
+   kudu_masters = "kudu-master:7051"
    table_list = [
    {
     table_name = "kudu_source_table_1"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
index 33697c11af..a7513760ae 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf
@@ -21,8 +21,7 @@
 env {
   # You can set engine configuration here
   parallelism = 1
-  job.mode = "STREAMING"
-  checkpoint.interval = 5000
+  job.mode = "BATCH"
 }
 
 source {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
index e6b799136c..c3aea6c857 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf
@@ -68,7 +68,7 @@ source {
 
 sink {
    kudu{
-    kudu_masters = "kudu-master-cdc:7051"
-    table_name = "kudu_sink_table"
+    kudu_masters = "kudu-master:7051"
+    table_name = "kudu_cdc_sink_table"
  }
 }
\ No newline at end of file

Reply via email to