This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch trino-435
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/trino-435 by this push:
     new 314ac78c739 [kudu] pick kudu connector to 452 version. (#232)
314ac78c739 is described below

commit 314ac78c739279c0a9d69c49d044cf0cb156206d
Author: daidai <2017501...@qq.com>
AuthorDate: Tue Jul 23 10:14:01 2024 +0800

    [kudu] pick kudu connector to 452 version. (#232)
    
    Migrate the changes of trino Kudu connector from version 435 to version 452.
    
    Changed some code to adapt to the changes of Trino Spi.
    
    Performed tpch tests on both Trino and Doris.
    
    It is recommended to compile with mvn package -Dmaven.test.skip=true.
---
 plugin/trino-kudu/pom.xml                          |  34 +-
 .../io/trino/plugin/kudu/KuduClientConfig.java     |  17 +-
 .../io/trino/plugin/kudu/KuduClientSession.java    |  81 ++---
 .../io/trino/plugin/kudu/KuduColumnHandle.java     |  76 +----
 .../java/io/trino/plugin/kudu/KuduMetadata.java    |  74 +++--
 .../java/io/trino/plugin/kudu/KuduPageSink.java    |  28 +-
 .../java/io/trino/plugin/kudu/KuduRecordSet.java   |   4 +-
 .../io/trino/plugin/kudu/KuduSplitManager.java     | 101 +++---
 .../java/io/trino/plugin/kudu/KuduTableHandle.java |   2 +-
 .../main/java/io/trino/plugin/kudu/TypeHelper.java | 171 +++++-----
 .../kudu/properties/HashPartitionDefinition.java   |  26 +-
 .../kudu/properties/KuduTableProperties.java       | 142 +++------
 .../plugin/kudu/properties/PartitionDesign.java    |   4 +-
 .../kudu/properties/RangeBoundValueSerializer.java |   2 +-
 .../plugin/kudu/properties/RangePartition.java     |  28 +-
 .../kudu/properties/RangePartitionDefinition.java  |  17 +-
 .../plugin/kudu/schema/NoSchemaEmulation.java      |   4 +-
 .../kudu/schema/SchemaAlreadyExistsException.java  |  41 ---
 .../SchemaEmulationByTableNameConvention.java      |   4 +-
 .../org/apache/kudu/client/KeyEncoderAccessor.java |   5 -
 .../plugin/kudu/BaseKuduConnectorSmokeTest.java    |  28 +-
 .../plugin/kudu/KuduCreateAndInsertDataSetup.java  |  48 +++
 .../trino/plugin/kudu/KuduQueryRunnerFactory.java  | 194 +++++-------
 .../trino/plugin/kudu/KuduTabletWaitStrategy.java  |  73 +++++
 .../io/trino/plugin/kudu/TestKuduClientConfig.java |   7 +-
 .../trino/plugin/kudu/TestKuduConnectorTest.java   |  93 ++++--
 .../kudu/TestKuduIntegrationDecimalColumns.java    |   3 +-
 .../kudu/TestKuduIntegrationDynamicFilter.java     |  61 ++--
 .../kudu/TestKuduIntegrationHashPartitioning.java  |   3 +-
 .../kudu/TestKuduIntegrationIntegerColumns.java    |   3 +-
 .../kudu/TestKuduIntegrationRangePartitioning.java |   3 +-
 .../io/trino/plugin/kudu/TestKuduTypeMapping.java  | 347 +++++++++++++++++++++
 .../io/trino/plugin/kudu/TestingKuduServer.java    |  11 +-
 .../java/io/trino/testing/BaseConnectorTest.java   |   6 +-
 34 files changed, 1007 insertions(+), 734 deletions(-)

diff --git a/plugin/trino-kudu/pom.xml b/plugin/trino-kudu/pom.xml
index a27bbd0bc26..9878e885ff6 100644
--- a/plugin/trino-kudu/pom.xml
+++ b/plugin/trino-kudu/pom.xml
@@ -10,11 +10,12 @@
 
     <artifactId>trino-kudu</artifactId>
     <packaging>trino-plugin</packaging>
-    <description>Trino - Kudu Connector</description>
+    <description>Trino - Kudu connector</description>
 
     <properties>
         <air.main.basedir>${project.parent.basedir}</air.main.basedir>
-        <kudu.version>1.15.0</kudu.version>
+        <air.compiler.fail-warnings>true</air.compiler.fail-warnings>
+        <kudu.version>1.17.0</kudu.version>
     </properties>
 
     <dependencies>
@@ -131,12 +132,30 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.google.errorprone</groupId>
+            <artifactId>error_prone_annotations</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.airlift</groupId>
             <artifactId>log-manager</artifactId>
             <scope>runtime</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>dev.failsafe</groupId>
+            <artifactId>failsafe</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.airlift</groupId>
             <artifactId>junit-extensions</artifactId>
@@ -218,20 +237,21 @@
         </dependency>
 
         <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>testcontainers</artifactId>
+            <groupId>org.rnorth.duct-tape</groupId>
+            <artifactId>duct-tape</artifactId>
+            <version>1.0.8</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.testcontainers</groupId>
-            <artifactId>toxiproxy</artifactId>
+            <artifactId>testcontainers</artifactId>
             <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.testng</groupId>
-            <artifactId>testng</artifactId>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>toxiproxy</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java
index 6d02132f0e7..6a0a281ec8c 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java
@@ -13,7 +13,6 @@
  */
 package io.trino.plugin.kudu;
 
-import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import io.airlift.configuration.Config;
 import io.airlift.configuration.ConfigDescription;
@@ -35,11 +34,11 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 @DefunctConfig("kudu.client.default-socket-read-timeout")
 public class KuduClientConfig
 {
-    private static final Splitter SPLITTER = 
Splitter.on(',').trimResults().omitEmptyStrings();
+    private static final Duration DEFAULT_OPERATION_TIMEOUT = new Duration(30, 
TimeUnit.SECONDS);
 
     private List<String> masterAddresses = ImmutableList.of();
-    private Duration defaultAdminOperationTimeout = new Duration(30, 
TimeUnit.SECONDS);
-    private Duration defaultOperationTimeout = new Duration(30, 
TimeUnit.SECONDS);
+    private Duration defaultAdminOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
+    private Duration defaultOperationTimeout = DEFAULT_OPERATION_TIMEOUT;
     private boolean disableStatistics;
     private boolean schemaEmulationEnabled;
     private String schemaEmulationPrefix = "presto::";
@@ -54,15 +53,9 @@ public class KuduClientConfig
     }
 
     @Config("kudu.client.master-addresses")
-    public KuduClientConfig setMasterAddresses(String commaSeparatedList)
+    public KuduClientConfig setMasterAddresses(List<String> commaSeparatedList)
     {
-        this.masterAddresses = SPLITTER.splitToList(commaSeparatedList);
-        return this;
-    }
-
-    public KuduClientConfig setMasterAddresses(String... contactPoints)
-    {
-        this.masterAddresses = ImmutableList.copyOf(contactPoints);
+        this.masterAddresses = commaSeparatedList;
         return this;
     }
 
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java
index f3b2ae96c8e..eec1ac45dd8 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java
@@ -15,6 +15,8 @@ package io.trino.plugin.kudu;
 
 import com.google.common.collect.ImmutableList;
 import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
 import io.trino.plugin.kudu.properties.ColumnDesign;
 import io.trino.plugin.kudu.properties.HashPartitionDefinition;
 import io.trino.plugin.kudu.properties.KuduTableProperties;
@@ -58,6 +60,7 @@ import 
org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -178,7 +181,7 @@ public class KuduClientSession
                         .boxed().collect(toList());
                 for (ColumnHandle column : desiredColumns.get()) {
                     KuduColumnHandle k = (KuduColumnHandle) column;
-                    int index = k.getOrdinalPosition();
+                    int index = k.ordinalPosition();
                     if (index >= primaryKeyColumnCount) {
                         columnIndexes.add(index);
                     }
@@ -194,7 +197,7 @@ public class KuduClientSession
         else {
             if (desiredColumns.isPresent()) {
                 columnIndexes = desiredColumns.get().stream()
-                        .map(handle -> ((KuduColumnHandle) 
handle).getOrdinalPosition())
+                        .map(handle -> ((KuduColumnHandle) 
handle).ordinalPosition())
                         .collect(toImmutableList());
             }
             else {
@@ -323,12 +326,12 @@ public class KuduClientSession
             String rawName = schemaEmulation.toRawName(schemaTableName);
             AlterTableOptions alterOptions = new AlterTableOptions();
             Type type = TypeHelper.toKuduClientType(column.getType());
-            alterOptions.addColumn(
-                    new ColumnSchemaBuilder(column.getName(), type)
-                            .nullable(true)
-                            .defaultValue(null)
-                            .comment(nullToEmpty(column.getComment())) // Kudu 
doesn't allow null comment
-                            .build());
+            ColumnSchemaBuilder builder = new 
ColumnSchemaBuilder(column.getName(), type)
+                    .nullable(true)
+                    .defaultValue(null)
+                    .comment(nullToEmpty(column.getComment())); // Kudu 
doesn't allow null comment
+            setTypeAttributes(column, builder);
+            alterOptions.addColumn(builder.build());
             client.alterTable(rawName, alterOptions);
         }
         catch (KuduException e) {
@@ -384,8 +387,8 @@ public class KuduClientSession
             if (definition == null) {
                 throw new TrinoException(QUERY_REJECTED, "Table " + 
schemaTableName + " has no range partition");
             }
-            PartialRow lowerBound = 
KuduTableProperties.toRangeBoundToPartialRow(schema, definition, 
rangePartition.getLower());
-            PartialRow upperBound = 
KuduTableProperties.toRangeBoundToPartialRow(schema, definition, 
rangePartition.getUpper());
+            PartialRow lowerBound = 
KuduTableProperties.toRangeBoundToPartialRow(schema, definition, 
rangePartition.lower());
+            PartialRow upperBound = 
KuduTableProperties.toRangeBoundToPartialRow(schema, definition, 
rangePartition.upper());
             AlterTableOptions alterOptions = new AlterTableOptions();
             switch (change) {
                 case ADD:
@@ -467,19 +470,19 @@ public class KuduClientSession
         PartitionDesign partitionDesign = 
KuduTableProperties.getPartitionDesign(properties);
         if (partitionDesign.getHash() != null) {
             for (HashPartitionDefinition partition : 
partitionDesign.getHash()) {
-                options.addHashPartitions(partition.getColumns(), 
partition.getBuckets());
+                options.addHashPartitions(partition.columns(), 
partition.buckets());
             }
         }
         if (partitionDesign.getRange() != null) {
             rangePartitionDefinition = partitionDesign.getRange();
-            
options.setRangePartitionColumns(rangePartitionDefinition.getColumns());
+            
options.setRangePartitionColumns(rangePartitionDefinition.columns());
         }
 
         List<RangePartition> rangePartitions = 
KuduTableProperties.getRangePartitions(properties);
         if (rangePartitionDefinition != null && !rangePartitions.isEmpty()) {
             for (RangePartition rangePartition : rangePartitions) {
-                PartialRow lower = 
KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, 
rangePartition.getLower());
-                PartialRow upper = 
KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, 
rangePartition.getUpper());
+                PartialRow lower = 
KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, 
rangePartition.lower());
+                PartialRow upper = 
KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, 
rangePartition.upper());
                 options.addRangePartition(lower, upper);
             }
         }
@@ -503,7 +506,7 @@ public class KuduClientSession
 
         Schema schema = table.getSchema();
         constraintSummary.getDomains().orElseThrow().forEach((columnHandle, 
domain) -> {
-            int position = ((KuduColumnHandle) 
columnHandle).getOrdinalPosition();
+            int position = ((KuduColumnHandle) columnHandle).ordinalPosition();
             ColumnSchema columnSchema = schema.getColumnByIndex(position);
             verify(!domain.isNone(), "Domain is none");
             if (domain.isAll()) {
@@ -529,8 +532,8 @@ public class KuduClientSession
                     KuduPredicate predicate = 
createInListPredicate(columnSchema, discreteValues);
                     builder.addPredicate(predicate);
                 }
-                else if (valueSet instanceof SortedRangeSet) {
-                    Ranges ranges = ((SortedRangeSet) valueSet).getRanges();
+                else if (valueSet instanceof SortedRangeSet sortedRangeSet) {
+                    Ranges ranges = sortedRangeSet.getRanges();
                     List<Range> rangeList = ranges.getOrderedRanges();
                     if (rangeList.stream().allMatch(Range::isSingleValue)) {
                         io.trino.spi.type.Type type = 
TypeHelper.fromKuduColumn(columnSchema);
@@ -577,35 +580,39 @@ public class KuduClientSession
     {
         io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema);
         Object javaValue = TypeHelper.getJavaValue(type, value);
-        if (javaValue instanceof Long) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Long) javaValue);
+        if (javaValue instanceof Long longValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
longValue);
         }
-        if (javaValue instanceof BigDecimal) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(BigDecimal) javaValue);
+        if (javaValue instanceof BigDecimal bigDecimal) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
bigDecimal);
         }
-        if (javaValue instanceof Integer) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Integer) javaValue);
+        if (javaValue instanceof Integer integerValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
integerValue);
         }
-        if (javaValue instanceof Short) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Short) javaValue);
+        if (javaValue instanceof Short shortValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
shortValue);
         }
-        if (javaValue instanceof Byte) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Byte) javaValue);
+        if (javaValue instanceof Byte byteValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
byteValue);
         }
-        if (javaValue instanceof String) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(String) javaValue);
+        if (javaValue instanceof String stringValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
stringValue);
         }
-        if (javaValue instanceof Double) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Double) javaValue);
+        if (javaValue instanceof Double doubleValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
doubleValue);
         }
-        if (javaValue instanceof Float) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Float) javaValue);
+        if (javaValue instanceof Float floatValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
floatValue);
         }
-        if (javaValue instanceof Boolean) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(Boolean) javaValue);
+        if (javaValue instanceof Boolean booleanValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
booleanValue);
         }
-        if (javaValue instanceof byte[]) {
-            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
(byte[]) javaValue);
+        if (javaValue instanceof byte[] byteArrayValue) {
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
byteArrayValue);
+        }
+        if (javaValue instanceof ByteBuffer byteBuffer) {
+            Slice slice = Slices.wrappedHeapBuffer(byteBuffer);
+            return KuduPredicate.newComparisonPredicate(columnSchema, op, 
slice.getBytes(0, slice.length()));
         }
         if (javaValue == null) {
             throw new IllegalStateException("Unexpected null java value for 
column " + columnSchema.getName());
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java
index 35c1f639c93..64d9cfc06c6 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java
@@ -13,19 +13,14 @@
  */
 package io.trino.plugin.kudu;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import io.trino.spi.connector.ColumnHandle;
 import io.trino.spi.connector.ColumnMetadata;
 import io.trino.spi.type.Type;
 import io.trino.spi.type.VarbinaryType;
 
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
-public class KuduColumnHandle
+public record KuduColumnHandle(String name, int ordinalPosition, Type type)
         implements ColumnHandle
 {
     public static final String ROW_ID = "row_uuid";
@@ -33,40 +28,13 @@ public class KuduColumnHandle
 
     public static final KuduColumnHandle ROW_ID_HANDLE = new 
KuduColumnHandle(ROW_ID, ROW_ID_POSITION, VarbinaryType.VARBINARY);
 
-    private final String name;
-    private final int ordinalPosition;
-    private final Type type;
-
-    @JsonCreator
-    public KuduColumnHandle(
-            @JsonProperty("name") String name,
-            @JsonProperty("ordinalPosition") int ordinalPosition,
-            @JsonProperty("type") Type type)
-    {
-        this.name = requireNonNull(name, "name is null");
-        this.ordinalPosition = ordinalPosition;
-        this.type = requireNonNull(type, "type is null");
-    }
-
-    @JsonProperty
-    public String getName()
-    {
-        return name;
-    }
-
-    @JsonProperty
-    public int getOrdinalPosition()
-    {
-        return ordinalPosition;
-    }
-
-    @JsonProperty
-    public Type getType()
+    public KuduColumnHandle
     {
-        return type;
+        requireNonNull(name, "name is null");
+        requireNonNull(type, "type is null");
     }
 
-    public ColumnMetadata getColumnMetadata()
+    public ColumnMetadata columnMetadata()
     {
         return new ColumnMetadata(name, type);
     }
@@ -75,38 +43,4 @@ public class KuduColumnHandle
     {
         return name.equals(ROW_ID);
     }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(
-                name,
-                ordinalPosition,
-                type);
-    }
-
-    @Override
-    public boolean equals(Object obj)
-    {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        KuduColumnHandle other = (KuduColumnHandle) obj;
-        return Objects.equals(this.name, other.name) &&
-                Objects.equals(this.ordinalPosition, other.ordinalPosition) &&
-                Objects.equals(this.type, other.type);
-    }
-
-    @Override
-    public String toString()
-    {
-        return toStringHelper(this)
-                .add("name", name)
-                .add("ordinalPosition", ordinalPosition)
-                .add("type", type)
-                .toString();
-    }
 }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java
index 4e16c1441e3..cb5993bfe78 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java
@@ -32,16 +32,17 @@ import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorTableHandle;
 import io.trino.spi.connector.ConnectorTableLayout;
 import io.trino.spi.connector.ConnectorTableMetadata;
-import io.trino.spi.connector.ConnectorTablePartitioning;
 import io.trino.spi.connector.ConnectorTableProperties;
+import io.trino.spi.connector.ConnectorTableVersion;
 import io.trino.spi.connector.Constraint;
 import io.trino.spi.connector.ConstraintApplicationResult;
 import io.trino.spi.connector.LimitApplicationResult;
-import io.trino.spi.connector.LocalProperty;
 import io.trino.spi.connector.NotFoundException;
 import io.trino.spi.connector.ProjectionApplicationResult;
+import io.trino.spi.connector.RelationColumnsMetadata;
 import io.trino.spi.connector.RetryMode;
 import io.trino.spi.connector.RowChangeParadigm;
+import io.trino.spi.connector.SaveMode;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.connector.SchemaTablePrefix;
 import io.trino.spi.expression.ConnectorExpression;
@@ -59,13 +60,16 @@ import 
org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.UnaryOperator;
 
 import static com.google.common.base.Strings.emptyToNull;
 import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -73,6 +77,8 @@ import static io.trino.plugin.kudu.KuduColumnHandle.ROW_ID;
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
 import static io.trino.spi.connector.RetryMode.NO_RETRIES;
 import static 
io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS;
+import static io.trino.spi.connector.SaveMode.IGNORE;
+import static io.trino.spi.connector.SaveMode.REPLACE;
 import static java.util.Objects.requireNonNull;
 
 public class KuduMetadata
@@ -99,9 +105,13 @@ public class KuduMetadata
     }
 
     @Override
-    public Map<SchemaTableName, List<ColumnMetadata>> 
listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+    public Iterator<RelationColumnsMetadata> streamRelationColumns(
+            ConnectorSession session,
+            Optional<String> schemaName,
+            UnaryOperator<Set<SchemaTableName>> relationFilter)
     {
-        requireNonNull(prefix, "prefix is null");
+        SchemaTablePrefix prefix = schemaName.map(SchemaTablePrefix::new)
+                .orElseGet(SchemaTablePrefix::new);
 
         List<SchemaTableName> tables;
         if (prefix.getTable().isEmpty()) {
@@ -111,15 +121,17 @@ public class KuduMetadata
             tables = ImmutableList.of(prefix.toSchemaTableName());
         }
 
-        ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = 
ImmutableMap.builder();
+        Map<SchemaTableName, RelationColumnsMetadata> relationColumns = new 
HashMap<>();
         for (SchemaTableName tableName : tables) {
-            KuduTableHandle tableHandle = getTableHandle(session, tableName);
+            KuduTableHandle tableHandle = getTableHandle(session, tableName, 
Optional.empty(), Optional.empty());
             if (tableHandle != null) {
-                ConnectorTableMetadata tableMetadata = 
getTableMetadata(tableHandle);
-                columns.put(tableName, tableMetadata.getColumns());
+                KuduTable table = tableHandle.getTable(clientSession);
+                relationColumns.put(tableName, 
RelationColumnsMetadata.forTable(tableName, 
getColumnsMetadata(table.getSchema())));
             }
         }
-        return columns.buildOrThrow();
+        return relationFilter.apply(relationColumns.keySet()).stream()
+                .map(relationColumns::get)
+                .iterator();
     }
 
     private ColumnMetadata getColumnMetadata(ColumnSchema column)
@@ -165,13 +177,18 @@ public class KuduMetadata
         // Kudu returns empty string as a table comment by default
         Optional<String> tableComment = 
Optional.ofNullable(emptyToNull(table.getComment()));
 
-        List<ColumnMetadata> columnsMetaList = schema.getColumns().stream()
+        List<ColumnMetadata> columns = getColumnsMetadata(schema);
+
+        Map<String, Object> properties = 
clientSession.getTableProperties(tableHandle);
+        return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), 
columns, properties, tableComment);
+    }
+
+    private List<ColumnMetadata> getColumnsMetadata(Schema schema)
+    {
+        return schema.getColumns().stream()
                 .filter(column -> !column.isKey() || 
!column.getName().equals(ROW_ID))
                 .map(this::getColumnMetadata)
                 .collect(toImmutableList());
-
-        Map<String, Object> properties = 
clientSession.getTableProperties(tableHandle);
-        return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), 
columnsMetaList, properties, tableComment);
     }
 
     @Override
@@ -180,7 +197,7 @@ public class KuduMetadata
         KuduTableHandle tableHandle = (KuduTableHandle) connectorTableHandle;
         ImmutableMap.Builder<String, ColumnHandle> columnHandles = 
ImmutableMap.builder();
         Schema schema = clientSession.getTableSchema(tableHandle);
-        forAllColumnHandles(schema, column -> 
columnHandles.put(column.getName(), column));
+        forAllColumnHandles(schema, column -> columnHandles.put(column.name(), 
column));
         return columnHandles.buildOrThrow();
     }
 
@@ -206,12 +223,16 @@ public class KuduMetadata
                     .setHidden(true)
                     .build();
         }
-        return kuduColumnHandle.getColumnMetadata();
+        return kuduColumnHandle.columnMetadata();
     }
 
     @Override
-    public KuduTableHandle getTableHandle(ConnectorSession session, 
SchemaTableName schemaTableName)
+    public KuduTableHandle getTableHandle(ConnectorSession session, 
SchemaTableName schemaTableName, Optional<ConnectorTableVersion> startVersion, 
Optional<ConnectorTableVersion> endVersion)
     {
+        if (startVersion.isPresent() || endVersion.isPresent()) {
+            throw new TrinoException(NOT_SUPPORTED, "This connector does not 
support versioned tables");
+        }
+
         try {
             KuduTable table = clientSession.openTable(schemaTableName);
             OptionalInt bucketCount = OptionalInt.empty();
@@ -248,12 +269,15 @@ public class KuduMetadata
     }
 
     @Override
-    public void createTable(ConnectorSession session, ConnectorTableMetadata 
tableMetadata, boolean ignoreExisting)
+    public void createTable(ConnectorSession session, ConnectorTableMetadata 
tableMetadata, SaveMode saveMode)
     {
+        if (saveMode == REPLACE) {
+            throw new TrinoException(NOT_SUPPORTED, "This connector does not 
support replacing tables");
+        }
         if (tableMetadata.getColumns().stream().anyMatch(column -> 
column.getComment() != null)) {
             throw new TrinoException(NOT_SUPPORTED, "This connector does not 
support creating tables with column comment");
         }
-        clientSession.createTable(tableMetadata, ignoreExisting);
+        clientSession.createTable(tableMetadata, saveMode == IGNORE);
     }
 
     @Override
@@ -282,7 +306,7 @@ public class KuduMetadata
     {
         KuduTableHandle kuduTableHandle = (KuduTableHandle) tableHandle;
         KuduColumnHandle kuduColumnHandle = (KuduColumnHandle) column;
-        clientSession.dropColumn(kuduTableHandle.getSchemaTableName(), 
kuduColumnHandle.getName());
+        clientSession.dropColumn(kuduTableHandle.getSchemaTableName(), 
kuduColumnHandle.name());
     }
 
     @Override
@@ -290,7 +314,7 @@ public class KuduMetadata
     {
         KuduTableHandle kuduTableHandle = (KuduTableHandle) tableHandle;
         KuduColumnHandle kuduColumnHandle = (KuduColumnHandle) source;
-        clientSession.renameColumn(kuduTableHandle.getSchemaTableName(), 
kuduColumnHandle.getName(), target);
+        clientSession.renameColumn(kuduTableHandle.getSchemaTableName(), 
kuduColumnHandle.name(), target);
     }
 
     @Override
@@ -328,6 +352,7 @@ public class KuduMetadata
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public ConnectorOutputTableHandle beginCreateTable(
             ConnectorSession session,
             ConnectorTableMetadata tableMetadata,
@@ -432,14 +457,11 @@ public class KuduMetadata
     {
         KuduTableHandle handle = (KuduTableHandle) table;
 
-        Optional<ConnectorTablePartitioning> tablePartitioning = 
Optional.empty();
-        List<LocalProperty<ColumnHandle>> localProperties = ImmutableList.of();
-
         return new ConnectorTableProperties(
                 handle.getConstraint(),
-                tablePartitioning,
                 Optional.empty(),
-                localProperties);
+                Optional.empty(),
+                ImmutableList.of());
     }
 
     @Override
@@ -508,7 +530,7 @@ public class KuduMetadata
         ImmutableList.Builder<Assignment> assignmentList = 
ImmutableList.builder();
         assignments.forEach((name, column) -> {
             desiredColumns.add(column);
-            assignmentList.add(new Assignment(name, column, 
((KuduColumnHandle) column).getType()));
+            assignmentList.add(new Assignment(name, column, 
((KuduColumnHandle) column).type()));
         });
 
         handle = new KuduTableHandle(
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java
index 29dc98d338a..8ce926a56a0 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java
@@ -16,6 +16,7 @@ package io.trino.plugin.kudu;
 import com.google.common.collect.ImmutableList;
 import io.airlift.slice.Slice;
 import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
 import io.trino.spi.block.Block;
 import io.trino.spi.connector.ConnectorMergeSink;
 import io.trino.spi.connector.ConnectorPageSink;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
 import static io.trino.spi.type.BigintType.BIGINT;
 import static io.trino.spi.type.BooleanType.BOOLEAN;
 import static io.trino.spi.type.DateType.DATE;
@@ -61,6 +63,7 @@ import static io.trino.spi.type.VarbinaryType.VARBINARY;
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.kudu.util.DateUtil.epochDaysToSqlDate;
 
 public class KuduPageSink
         implements ConnectorPageSink, ConnectorMergeSink
@@ -138,8 +141,8 @@ public class KuduPageSink
             }
             return NOT_BLOCKED;
         }
-        catch (KuduException e) {
-            throw new RuntimeException(e);
+        catch (KuduException | RuntimeException e) {
+            throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
         }
     }
 
@@ -150,6 +153,9 @@ public class KuduPageSink
         if (block.isNull(position)) {
             row.setNull(destChannel);
         }
+        else if (DATE.equals(type)) {
+            row.addDate(destChannel, epochDaysToSqlDate(INTEGER.getInt(block, 
position)));
+        }
         else if (TIMESTAMP_MILLIS.equals(type)) {
             row.addLong(destChannel, 
truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position)));
         }
@@ -174,7 +180,8 @@ public class KuduPageSink
         else if (DOUBLE.equals(type)) {
             row.addDouble(destChannel, DOUBLE.getDouble(block, position));
         }
-        else if (type instanceof VarcharType varcharType) {
+        else if (type instanceof VarcharType) {
+            VarcharType varcharType = (VarcharType) type;
             Type originalType = originalColumnTypes.get(destChannel);
             if (DATE.equals(originalType)) {
                 SqlDate date = (SqlDate) 
originalType.getObjectValue(connectorSession, block, position);
@@ -189,7 +196,8 @@ public class KuduPageSink
         else if (VARBINARY.equals(type)) {
             row.addBinary(destChannel, VARBINARY.getSlice(block, 
position).toByteBuffer());
         }
-        else if (type instanceof DecimalType decimalType) {
+        else if (type instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) type;
             SqlDecimal sqlDecimal = (SqlDecimal) 
decimalType.getObjectValue(connectorSession, block, position);
             row.addDecimal(destChannel, sqlDecimal.toBigDecimal());
         }
@@ -228,8 +236,8 @@ public class KuduPageSink
                     try {
                         operationApplier.applyOperationAsync(delete);
                     }
-                    catch (KuduException e) {
-                        throw new RuntimeException(e);
+                    catch (KuduException | RuntimeException e) {
+                        throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
                     }
                 }
 
@@ -248,14 +256,14 @@ public class KuduPageSink
                     try {
                         operationApplier.applyOperationAsync(insert);
                     }
-                    catch (KuduException e) {
-                        throw new RuntimeException(e);
+                    catch (KuduException | RuntimeException e) {
+                        throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
                     }
                 }
             }
         }
-        catch (KuduException e) {
-            throw new RuntimeException(e);
+        catch (KuduException | RuntimeException e) {
+            throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
         }
     }
 
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java
index cc686497c8c..da0949aa28e 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java
@@ -47,7 +47,7 @@ public class KuduRecordSet
     public List<Type> getColumnTypes()
     {
         return columns.stream()
-                .map(column -> ((KuduColumnHandle) column).getType())
+                .map(column -> ((KuduColumnHandle) column).type())
                 .collect(toImmutableList());
     }
 
@@ -63,7 +63,7 @@ public class KuduRecordSet
                 builder.put(i, ROW_ID_POSITION);
             }
             else {
-                builder.put(i, 
projectedSchema.getColumnIndex(handle.getName()));
+                builder.put(i, projectedSchema.getColumnIndex(handle.name()));
             }
         }
 
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java
index 52be6748547..4246ac1f52c 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java
@@ -13,6 +13,7 @@
  */
 package io.trino.plugin.kudu;
 
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplitManager;
@@ -25,16 +26,15 @@ import io.trino.spi.connector.FixedSplitSource;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import static 
io.trino.plugin.kudu.KuduSessionProperties.getDynamicFilteringWaitTimeout;
-import static io.trino.spi.connector.DynamicFilter.NOT_BLOCKED;
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class KuduSplitManager
         implements ConnectorSplitManager
 {
+    private static final ConnectorSplitSource.ConnectorSplitBatch EMPTY_BATCH 
= new ConnectorSplitSource.ConnectorSplitBatch(ImmutableList.of(), false);
     private final KuduClientSession clientSession;
 
     @Inject
@@ -51,82 +51,79 @@ public class KuduSplitManager
             DynamicFilter dynamicFilter,
             Constraint constraint)
     {
-        long timeoutMillis = 
getDynamicFilteringWaitTimeout(session).toMillis();
-        if (timeoutMillis == 0 || !dynamicFilter.isAwaitable()) {
-            return getSplitSource(table, dynamicFilter);
-        }
-        CompletableFuture<?> dynamicFilterFuture = whenCompleted(dynamicFilter)
-                .completeOnTimeout(null, timeoutMillis, MILLISECONDS);
-        CompletableFuture<ConnectorSplitSource> splitSourceFuture = 
dynamicFilterFuture.thenApply(
-                ignored -> getSplitSource(table, dynamicFilter));
-        return new KuduDynamicFilteringSplitSource(dynamicFilterFuture, 
splitSourceFuture);
-    }
-
-    private ConnectorSplitSource getSplitSource(
-            ConnectorTableHandle table,
-            DynamicFilter dynamicFilter)
-    {
-        KuduTableHandle handle = (KuduTableHandle) table;
-
-        List<KuduSplit> splits = clientSession.buildKuduSplits(handle, 
dynamicFilter);
-
-        return new FixedSplitSource(splits);
-    }
-
-    private static CompletableFuture<?> whenCompleted(DynamicFilter 
dynamicFilter)
-    {
-        if (dynamicFilter.isAwaitable()) {
-            return dynamicFilter.isBlocked().thenCompose(ignored -> 
whenCompleted(dynamicFilter));
-        }
-        return NOT_BLOCKED;
+        return new KuduDynamicFilteringSplitSource(session, clientSession, 
dynamicFilter, table);
     }
 
     private static class KuduDynamicFilteringSplitSource
             implements ConnectorSplitSource
     {
-        private final CompletableFuture<?> dynamicFilterFuture;
-        private final CompletableFuture<ConnectorSplitSource> 
splitSourceFuture;
+        private final KuduClientSession clientSession;
+        private final DynamicFilter dynamicFilter;
+        private final ConnectorTableHandle tableHandle;
+        private final long dynamicFilteringTimeoutNanos;
+        private ConnectorSplitSource delegateSplitSource;
+        private final long startNanos;
 
         private KuduDynamicFilteringSplitSource(
-                CompletableFuture<?> dynamicFilterFuture,
-                CompletableFuture<ConnectorSplitSource> splitSourceFuture)
+                ConnectorSession connectorSession,
+                KuduClientSession clientSession,
+                DynamicFilter dynamicFilter,
+                ConnectorTableHandle tableHandle)
         {
-            this.dynamicFilterFuture = requireNonNull(dynamicFilterFuture, 
"dynamicFilterFuture is null");
-            this.splitSourceFuture = requireNonNull(splitSourceFuture, 
"splitSourceFuture is null");
+            this.clientSession = requireNonNull(clientSession, "clientSession 
is null");
+            this.dynamicFilter = requireNonNull(dynamicFilter, 
"dynamicFilterFuture is null");
+            this.tableHandle = requireNonNull(tableHandle, "splitSourceFuture 
is null");
+            this.dynamicFilteringTimeoutNanos = (long) 
getDynamicFilteringWaitTimeout(connectorSession).getValue(NANOSECONDS);
+            this.startNanos = System.nanoTime();
         }
 
         @Override
         public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
         {
-            return splitSourceFuture.thenCompose(splitSource -> 
splitSource.getNextBatch(maxSize));
+            CompletableFuture<?> blocked = dynamicFilter.isBlocked();
+            long remainingTimeoutNanos = getRemainingTimeoutNanos();
+            if (remainingTimeoutNanos > 0 && dynamicFilter.isAwaitable()) {
+                // wait for dynamic filter and yield
+                return blocked
+                        .thenApply(x -> EMPTY_BATCH)
+                        .completeOnTimeout(EMPTY_BATCH, remainingTimeoutNanos, 
NANOSECONDS);
+            }
+
+            if (delegateSplitSource == null) {
+                KuduTableHandle handle = (KuduTableHandle) tableHandle;
+
+                List<KuduSplit> splits = clientSession.buildKuduSplits(handle, 
dynamicFilter);
+                delegateSplitSource = new FixedSplitSource(splits);
+            }
+
+            return delegateSplitSource.getNextBatch(maxSize);
         }
 
         @Override
         public void close()
         {
-            if (!dynamicFilterFuture.cancel(true)) {
-                splitSourceFuture.thenAccept(ConnectorSplitSource::close);
+            if (delegateSplitSource != null) {
+                delegateSplitSource.close();
             }
         }
 
         @Override
         public boolean isFinished()
         {
-            if (!splitSourceFuture.isDone()) {
-                return false;
-            }
-            if (splitSourceFuture.isCompletedExceptionally()) {
+            if (getRemainingTimeoutNanos() > 0 && dynamicFilter.isAwaitable()) 
{
                 return false;
             }
-            try {
-                return splitSourceFuture.get().isFinished();
-            }
-            catch (InterruptedException | ExecutionException e) {
-                if (e instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
-                }
-                throw new RuntimeException(e);
+
+            if (delegateSplitSource != null) {
+                return delegateSplitSource.isFinished();
             }
+
+            return false;
+        }
+
+        private long getRemainingTimeoutNanos()
+        {
+            return dynamicFilteringTimeoutNanos - (System.nanoTime() - 
startNanos);
         }
     }
 }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java
index cc63f844849..aaaa738c94b 100755
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java
@@ -146,7 +146,7 @@ public class KuduTableHandle
         return Objects.equals(this.schemaTableName, other.schemaTableName) &&
                 Objects.equals(this.constraint, other.constraint) &&
                 Objects.equals(this.desiredColumns, other.desiredColumns) &&
-                Objects.equals(this.requiresRowId, other.requiresRowId) &&
+                this.requiresRowId == other.requiresRowId &&
                 Objects.equals(this.bucketCount, other.bucketCount) &&
                 Objects.equals(this.limit, other.limit);
     }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java
index 5fdd31d30ab..3c294029659 100644
--- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java
+++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java
@@ -50,23 +50,20 @@ public final class TypeHelper
 
     public static org.apache.kudu.Type toKuduClientType(Type type)
     {
-        if (type instanceof VarcharType) {
-            return org.apache.kudu.Type.STRING;
+        if (type == BooleanType.BOOLEAN) {
+            return org.apache.kudu.Type.BOOL;
         }
-        if (type.equals(TIMESTAMP_MILLIS)) {
-            return org.apache.kudu.Type.UNIXTIME_MICROS;
+        if (type == TinyintType.TINYINT) {
+            return org.apache.kudu.Type.INT8;
         }
-        if (type == BigintType.BIGINT) {
-            return org.apache.kudu.Type.INT64;
+        if (type == SmallintType.SMALLINT) {
+            return org.apache.kudu.Type.INT16;
         }
         if (type == IntegerType.INTEGER) {
             return org.apache.kudu.Type.INT32;
         }
-        if (type == SmallintType.SMALLINT) {
-            return org.apache.kudu.Type.INT16;
-        }
-        if (type == TinyintType.TINYINT) {
-            return org.apache.kudu.Type.INT8;
+        if (type == BigintType.BIGINT) {
+            return org.apache.kudu.Type.INT64;
         }
         if (type == RealType.REAL) {
             return org.apache.kudu.Type.FLOAT;
@@ -74,21 +71,24 @@ public final class TypeHelper
         if (type == DoubleType.DOUBLE) {
             return org.apache.kudu.Type.DOUBLE;
         }
-        if (type == BooleanType.BOOLEAN) {
-            return org.apache.kudu.Type.BOOL;
-        }
-        if (type instanceof VarbinaryType) {
-            return org.apache.kudu.Type.BINARY;
-        }
         if (type instanceof DecimalType) {
             return org.apache.kudu.Type.DECIMAL;
         }
-        if (type == DateType.DATE) {
+        if (type instanceof CharType) {
             return org.apache.kudu.Type.STRING;
         }
-        if (type instanceof CharType) {
+        if (type instanceof VarcharType) {
             return org.apache.kudu.Type.STRING;
         }
+        if (type instanceof VarbinaryType) {
+            return org.apache.kudu.Type.BINARY;
+        }
+        if (type == DateType.DATE) {
+            return org.apache.kudu.Type.DATE;
+        }
+        if (type.equals(TIMESTAMP_MILLIS)) {
+            return org.apache.kudu.Type.UNIXTIME_MICROS;
+        }
         throw new TrinoException(NOT_SUPPORTED, "Unsupported type: " + type);
     }
 
@@ -100,31 +100,32 @@ public final class TypeHelper
     private static Type fromKuduClientType(org.apache.kudu.Type ktype, 
ColumnTypeAttributes attributes)
     {
         switch (ktype) {
-            case STRING:
-                return VarcharType.VARCHAR;
-            case UNIXTIME_MICROS:
-                return TIMESTAMP_MILLIS;
-            case INT64:
-                return BigintType.BIGINT;
-            case INT32:
-                return IntegerType.INTEGER;
-            case INT16:
-                return SmallintType.SMALLINT;
+            case BOOL:
+                return BooleanType.BOOLEAN;
             case INT8:
                 return TinyintType.TINYINT;
+            case INT16:
+                return SmallintType.SMALLINT;
+            case INT32:
+                return IntegerType.INTEGER;
+            case INT64:
+                return BigintType.BIGINT;
             case FLOAT:
                 return RealType.REAL;
             case DOUBLE:
                 return DoubleType.DOUBLE;
-            case BOOL:
-                return BooleanType.BOOLEAN;
-            case BINARY:
-                return VarbinaryType.VARBINARY;
             case DECIMAL:
                 return 
DecimalType.createDecimalType(attributes.getPrecision(), attributes.getScale());
-            // TODO: add support for varchar and date types: 
https://github.com/trinodb/trino/issues/11009
-            case VARCHAR:
+            case STRING:
+                return VarcharType.VARCHAR;
+            case BINARY:
+                return VarbinaryType.VARBINARY;
             case DATE:
+                return DateType.DATE;
+            case UNIXTIME_MICROS:
+                return TIMESTAMP_MILLIS;
+            // TODO: add support for varchar types: 
https://github.com/trinodb/trino/issues/11009
+            case VARCHAR:
                 break;
         }
         throw new IllegalStateException("Kudu type not implemented for " + 
ktype);
@@ -132,114 +133,90 @@ public final class TypeHelper
 
     public static Object getJavaValue(Type type, Object nativeValue)
     {
-        if (type instanceof VarcharType) {
-            return ((Slice) nativeValue).toStringUtf8();
-        }
-        if (type.equals(TIMESTAMP_MILLIS)) {
-            // Kudu's native format is in microseconds
-            return nativeValue;
-        }
-        if (type == BigintType.BIGINT) {
+        if (type == BooleanType.BOOLEAN) {
             return nativeValue;
         }
-        if (type == IntegerType.INTEGER) {
-            return ((Long) nativeValue).intValue();
+        if (type == TinyintType.TINYINT) {
+            return ((Long) nativeValue).byteValue();
         }
         if (type == SmallintType.SMALLINT) {
             return ((Long) nativeValue).shortValue();
         }
-        if (type == TinyintType.TINYINT) {
-            return ((Long) nativeValue).byteValue();
+        if (type == IntegerType.INTEGER) {
+            return ((Long) nativeValue).intValue();
         }
-        if (type == DoubleType.DOUBLE) {
+        if (type == BigintType.BIGINT) {
             return nativeValue;
         }
         if (type == RealType.REAL) {
             // conversion can result in precision lost
             return intBitsToFloat(((Long) nativeValue).intValue());
         }
-        if (type == BooleanType.BOOLEAN) {
+        if (type == DoubleType.DOUBLE) {
             return nativeValue;
         }
-        if (type instanceof VarbinaryType) {
-            return ((Slice) nativeValue).toByteBuffer();
-        }
-        if (type instanceof DecimalType decimalType) {
+        if (type instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) type;
             if (decimalType.isShort()) {
                 return new BigDecimal(BigInteger.valueOf((long) nativeValue), 
decimalType.getScale());
             }
             return new BigDecimal(((Int128) nativeValue).toBigInteger(), 
decimalType.getScale());
         }
-        throw new IllegalStateException("Back conversion not implemented for " 
+ type);
-    }
-
-    public static Object getObject(Type type, RowResult row, int field)
-    {
-        if (row.isNull(field)) {
-            return null;
-        }
         if (type instanceof VarcharType) {
-            return row.getString(field);
-        }
-        if (type.equals(TIMESTAMP_MILLIS)) {
-            return truncateEpochMicrosToMillis(row.getLong(field));
-        }
-        if (type == BigintType.BIGINT) {
-            return row.getLong(field);
-        }
-        if (type == IntegerType.INTEGER) {
-            return row.getInt(field);
-        }
-        if (type == SmallintType.SMALLINT) {
-            return row.getShort(field);
-        }
-        if (type == TinyintType.TINYINT) {
-            return row.getByte(field);
-        }
-        if (type == DoubleType.DOUBLE) {
-            return row.getDouble(field);
+            return ((Slice) nativeValue).toStringUtf8();
         }
-        if (type == RealType.REAL) {
-            return row.getFloat(field);
+        if (type instanceof VarbinaryType) {
+            return ((Slice) nativeValue).toByteBuffer();
         }
-        if (type == BooleanType.BOOLEAN) {
-            return row.getBoolean(field);
+        if (type.equals(DateType.DATE)) {
+            return nativeValue;
         }
-        if (type instanceof VarbinaryType) {
-            return Slices.wrappedHeapBuffer(row.getBinary(field));
+        if (type.equals(TIMESTAMP_MILLIS)) {
+            // Kudu's native format is in microseconds
+            return nativeValue;
         }
+        throw new IllegalStateException("Back conversion not implemented for " 
+ type);
+    }
+
+    public static Object getObject(Type type, RowResult row, int field)
+    {
         if (type instanceof DecimalType) {
-            return Decimals.encodeScaledValue(row.getDecimal(field), 
((DecimalType) type).getScale());
+            DecimalType decimalType = (DecimalType) type;
+            return Decimals.encodeScaledValue(row.getDecimal(field), 
decimalType.getScale());
         }
         throw new IllegalStateException("getObject not implemented for " + 
type);
     }
 
     public static long getLong(Type type, RowResult row, int field)
     {
-        if (type.equals(TIMESTAMP_MILLIS)) {
-            return truncateEpochMicrosToMillis(row.getLong(field));
+        if (type == TinyintType.TINYINT) {
+            return row.getByte(field);
         }
-        if (type == BigintType.BIGINT) {
-            return row.getLong(field);
+        if (type == SmallintType.SMALLINT) {
+            return row.getShort(field);
         }
         if (type == IntegerType.INTEGER) {
             return row.getInt(field);
         }
-        if (type == SmallintType.SMALLINT) {
-            return row.getShort(field);
-        }
-        if (type == TinyintType.TINYINT) {
-            return row.getByte(field);
+        if (type == BigintType.BIGINT) {
+            return row.getLong(field);
         }
         if (type == RealType.REAL) {
             return floatToRawIntBits(row.getFloat(field));
         }
-        if (type instanceof DecimalType decimalType) {
+        if (type instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType) type;
             if (decimalType.isShort()) {
                 return row.getDecimal(field).unscaledValue().longValue();
             }
             throw new IllegalStateException("getLong not supported for long 
decimal: " + type);
         }
+        if (type.equals(DateType.DATE)) {
+            return row.getInt(field);
+        }
+        if (type.equals(TIMESTAMP_MILLIS)) {
+            return truncateEpochMicrosToMillis(row.getLong(field));
+        }
         throw new IllegalStateException("getLong not implemented for " + type);
     }
 
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
index cdbc1bfc8ae..3eac4e128b8 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java
@@ -15,28 +15,4 @@ package io.trino.plugin.kudu.properties;
 
 import java.util.List;
 
-public class HashPartitionDefinition
-{
-    private List<String> columns;
-    private int buckets;
-
-    public List<String> getColumns()
-    {
-        return columns;
-    }
-
-    public void setColumns(List<String> columns)
-    {
-        this.columns = columns;
-    }
-
-    public int getBuckets()
-    {
-        return buckets;
-    }
-
-    public void setBuckets(int buckets)
-    {
-        this.buckets = buckets;
-    }
-}
+public record HashPartitionDefinition(List<String> columns, int buckets) {}
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
index 48c3bead663..909a3f7cc86 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java
@@ -190,9 +190,7 @@ public final class KuduTableProperties
         @SuppressWarnings("unchecked")
         List<String> rangeColumns = (List<String>) 
tableProperties.get(PARTITION_BY_RANGE_COLUMNS);
         if (!rangeColumns.isEmpty()) {
-            RangePartitionDefinition range = new RangePartitionDefinition();
-            range.setColumns(rangeColumns);
-            design.setRange(range);
+            design.setRange(new RangePartitionDefinition(rangeColumns));
         }
 
         return design;
@@ -234,10 +232,7 @@ public final class KuduTableProperties
         if (hashBuckets == null) {
             throw new TrinoException(GENERIC_USER_ERROR, "Missing table 
property " + bucketPropertyName);
         }
-        HashPartitionDefinition definition = new HashPartitionDefinition();
-        definition.setColumns(columns);
-        definition.setBuckets(hashBuckets);
-        return definition;
+        return new HashPartitionDefinition(columns, hashBuckets);
     }
 
     public static List<RangePartition> getRangePartitions(Map<String, Object> 
tableProperties)
@@ -285,17 +280,17 @@ public final class KuduTableProperties
             if (partitionDesign.getHash() != null) {
                 List<HashPartitionDefinition> list = partitionDesign.getHash();
                 if (!list.isEmpty()) {
-                    properties.put(PARTITION_BY_HASH_COLUMNS, 
list.get(0).getColumns());
-                    properties.put(PARTITION_BY_HASH_BUCKETS, 
list.get(0).getBuckets());
+                    properties.put(PARTITION_BY_HASH_COLUMNS, 
list.get(0).columns());
+                    properties.put(PARTITION_BY_HASH_BUCKETS, 
list.get(0).buckets());
                 }
                 if (list.size() >= 2) {
-                    properties.put(PARTITION_BY_HASH_COLUMNS_2, 
list.get(1).getColumns());
-                    properties.put(PARTITION_BY_HASH_BUCKETS_2, 
list.get(1).getBuckets());
+                    properties.put(PARTITION_BY_HASH_COLUMNS_2, 
list.get(1).columns());
+                    properties.put(PARTITION_BY_HASH_BUCKETS_2, 
list.get(1).buckets());
                 }
             }
 
             if (partitionDesign.getRange() != null) {
-                properties.put(PARTITION_BY_RANGE_COLUMNS, 
partitionDesign.getRange().getColumns());
+                properties.put(PARTITION_BY_RANGE_COLUMNS, 
partitionDesign.getRange().columns());
             }
 
             String partitionRangesValue = 
mapper.writeValueAsString(rangePartitionList);
@@ -400,22 +395,17 @@ public final class KuduTableProperties
 
         List<HashPartitionDefinition> hashPartitions = 
partitionSchema.getHashBucketSchemas().stream()
                 .map(hashBucketSchema -> {
-                    HashPartitionDefinition hash = new 
HashPartitionDefinition();
                     List<String> cols = 
hashBucketSchema.getColumnIds().stream()
                             .map(idx -> 
schema.getColumnByIndex(idx).getName()).collect(toImmutableList());
-                    hash.setColumns(cols);
-                    hash.setBuckets(hashBucketSchema.getNumBuckets());
-                    return hash;
+                    return new HashPartitionDefinition(cols, 
hashBucketSchema.getNumBuckets());
                 }).collect(toImmutableList());
         partitionDesign.setHash(hashPartitions);
 
         List<Integer> rangeColumns = 
partitionSchema.getRangeSchema().getColumnIds();
         if (!rangeColumns.isEmpty()) {
-            RangePartitionDefinition definition = new 
RangePartitionDefinition();
-            definition.setColumns(rangeColumns.stream()
+            partitionDesign.setRange(new 
RangePartitionDefinition(rangeColumns.stream()
                     .map(i -> schema.getColumns().get(i).getName())
-                    .collect(toImmutableList()));
-            partitionDesign.setRange(definition);
+                    .collect(toImmutableList())));
         }
 
         return partitionDesign;
@@ -426,7 +416,7 @@ public final class KuduTableProperties
     {
         PartialRow partialRow = new PartialRow(schema);
         if (boundValue != null) {
-            List<Integer> rangeColumns = definition.getColumns().stream()
+            List<Integer> rangeColumns = definition.columns().stream()
                     .map(schema::getColumnIndex).collect(toImmutableList());
 
             if (rangeColumns.size() != boundValue.getValues().size()) {
@@ -555,91 +545,53 @@ public final class KuduTableProperties
 
     public static ColumnSchema.CompressionAlgorithm lookupCompression(String 
compression)
     {
-        switch (compression.toLowerCase(Locale.ENGLISH)) {
-            case "default":
-            case "default_compression":
-                return ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION;
-            case "no":
-            case "no_compression":
-                return ColumnSchema.CompressionAlgorithm.NO_COMPRESSION;
-            case "lz4":
-                return ColumnSchema.CompressionAlgorithm.LZ4;
-            case "snappy":
-                return ColumnSchema.CompressionAlgorithm.SNAPPY;
-            case "zlib":
-                return ColumnSchema.CompressionAlgorithm.ZLIB;
-            default:
-                throw new IllegalArgumentException();
-        }
+        return switch (compression.toLowerCase(Locale.ENGLISH)) {
+            case "default", "default_compression" -> 
ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION;
+            case "no", "no_compression" -> 
ColumnSchema.CompressionAlgorithm.NO_COMPRESSION;
+            case "lz4" -> ColumnSchema.CompressionAlgorithm.LZ4;
+            case "snappy" -> ColumnSchema.CompressionAlgorithm.SNAPPY;
+            case "zlib" -> ColumnSchema.CompressionAlgorithm.ZLIB;
+            default -> throw new IllegalArgumentException();
+        };
     }
 
     public static String 
lookupCompressionString(ColumnSchema.CompressionAlgorithm algorithm)
     {
-        switch (algorithm) {
-            case DEFAULT_COMPRESSION:
-                return "default";
-            case NO_COMPRESSION:
-                return "no";
-            case LZ4:
-                return "lz4";
-            case SNAPPY:
-                return "snappy";
-            case ZLIB:
-                return "zlib";
-            default:
-                return "unknown";
-        }
+        return switch (algorithm) {
+            case DEFAULT_COMPRESSION -> "default";
+            case NO_COMPRESSION -> "no";
+            case LZ4 -> "lz4";
+            case SNAPPY -> "snappy";
+            case ZLIB -> "zlib";
+            default -> "unknown";
+        };
     }
 
     public static ColumnSchema.Encoding lookupEncoding(String encoding)
     {
-        switch (encoding.toLowerCase(Locale.ENGLISH)) {
-            case "auto":
-            case "auto_encoding":
-                return ColumnSchema.Encoding.AUTO_ENCODING;
-            case "bitshuffle":
-            case "bit_shuffle":
-                return ColumnSchema.Encoding.BIT_SHUFFLE;
-            case "dictionary":
-            case "dict_encoding":
-                return ColumnSchema.Encoding.DICT_ENCODING;
-            case "plain":
-            case "plain_encoding":
-                return ColumnSchema.Encoding.PLAIN_ENCODING;
-            case "prefix":
-            case "prefix_encoding":
-                return ColumnSchema.Encoding.PREFIX_ENCODING;
-            case "runlength":
-            case "run_length":
-            case "run length":
-            case "rle":
-                return ColumnSchema.Encoding.RLE;
-            case "group_varint":
-                return ColumnSchema.Encoding.GROUP_VARINT;
-            default:
-                throw new IllegalArgumentException();
-        }
+        return switch (encoding.toLowerCase(Locale.ENGLISH)) {
+            case "auto", "auto_encoding" -> 
ColumnSchema.Encoding.AUTO_ENCODING;
+            case "bitshuffle", "bit_shuffle" -> 
ColumnSchema.Encoding.BIT_SHUFFLE;
+            case "dictionary", "dict_encoding" -> 
ColumnSchema.Encoding.DICT_ENCODING;
+            case "plain", "plain_encoding" -> 
ColumnSchema.Encoding.PLAIN_ENCODING;
+            case "prefix", "prefix_encoding" -> 
ColumnSchema.Encoding.PREFIX_ENCODING;
+            case "runlength", "run_length", "run length", "rle" -> 
ColumnSchema.Encoding.RLE;
+            case "group_varint" -> ColumnSchema.Encoding.GROUP_VARINT;
+            default -> throw new IllegalArgumentException();
+        };
     }
 
     public static String lookupEncodingString(ColumnSchema.Encoding encoding)
     {
-        switch (encoding) {
-            case AUTO_ENCODING:
-                return "auto";
-            case BIT_SHUFFLE:
-                return "bitshuffle";
-            case DICT_ENCODING:
-                return "dictionary";
-            case PLAIN_ENCODING:
-                return "plain";
-            case PREFIX_ENCODING:
-                return "prefix";
-            case RLE:
-                return "runlength";
-            case GROUP_VARINT:
-                return "group_varint";
-            default:
-                return "unknown";
-        }
+        return switch (encoding) {
+            case AUTO_ENCODING -> "auto";
+            case BIT_SHUFFLE -> "bitshuffle";
+            case DICT_ENCODING -> "dictionary";
+            case PLAIN_ENCODING -> "plain";
+            case PREFIX_ENCODING -> "prefix";
+            case RLE -> "runlength";
+            case GROUP_VARINT -> "group_varint";
+            default -> "unknown";
+        };
     }
 }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
index eb06436a144..90d556dd2e4 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java
@@ -42,7 +42,7 @@ public class PartitionDesign
 
     public boolean hasPartitions()
     {
-        return hash != null && !hash.isEmpty() && 
!hash.get(0).getColumns().isEmpty()
-                || range != null && !range.getColumns().isEmpty();
+        return hash != null && !hash.isEmpty() && 
!hash.get(0).columns().isEmpty()
+                || range != null && !range.columns().isEmpty();
     }
 }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
index b1c484b3012..f9dcad16dec 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java
@@ -36,7 +36,7 @@ public class RangeBoundValueSerializer
                 writeValue(value.getValues().get(0), gen);
             }
             else {
-                gen.writeStartArray(value.getValues().size());
+                gen.writeStartArray();
                 for (Object obj : value.getValues()) {
                     writeValue(obj, gen);
                 }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
index d8f5919c05d..f1188e09ca3 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java
@@ -13,30 +13,6 @@
  */
 package io.trino.plugin.kudu.properties;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.annotation.Nullable;
 
-public class RangePartition
-{
-    private final RangeBoundValue lower;
-    private final RangeBoundValue upper;
-
-    @JsonCreator
-    public RangePartition(
-            @JsonProperty("lower") RangeBoundValue lower,
-            @JsonProperty("upper") RangeBoundValue upper)
-    {
-        this.lower = lower;
-        this.upper = upper;
-    }
-
-    public RangeBoundValue getLower()
-    {
-        return lower;
-    }
-
-    public RangeBoundValue getUpper()
-    {
-        return upper;
-    }
-}
+public record RangePartition(@Nullable RangeBoundValue lower, @Nullable 
RangeBoundValue upper) {}
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
index 1fdf49aabf7..1fed3bc203b 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java
@@ -13,19 +13,16 @@
  */
 package io.trino.plugin.kudu.properties;
 
-import java.util.List;
+import com.google.common.collect.ImmutableList;
 
-public class RangePartitionDefinition
-{
-    private List<String> columns;
+import java.util.List;
 
-    public List<String> getColumns()
-    {
-        return columns;
-    }
+import static java.util.Objects.requireNonNull;
 
-    public void setColumns(List<String> columns)
+public record RangePartitionDefinition(List<String> columns)
+{
+    public RangePartitionDefinition
     {
-        this.columns = columns;
+        columns = ImmutableList.copyOf(requireNonNull(columns, "columns is 
null"));
     }
 }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
index ad3c828445d..e1647995846 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java
@@ -23,6 +23,8 @@ import java.util.List;
 
 import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA;
 import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
+import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS;
+import static java.lang.String.format;
 
 public class NoSchemaEmulation
         implements SchemaEmulation
@@ -31,7 +33,7 @@ public class NoSchemaEmulation
     public void createSchema(KuduClientWrapper client, String schemaName)
     {
         if (DEFAULT_SCHEMA.equals(schemaName)) {
-            throw new SchemaAlreadyExistsException(schemaName);
+            throw new TrinoException(SCHEMA_ALREADY_EXISTS, format("Schema 
already exists: '%s'", schemaName));
         }
         throw new TrinoException(GENERIC_USER_ERROR, "Creating schema in Kudu 
connector not allowed if schema emulation is disabled.");
     }
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java
deleted file mode 100644
index 64c9ff9ac45..00000000000
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.kudu.schema;
-
-import io.trino.spi.TrinoException;
-
-import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS;
-import static java.lang.String.format;
-
-public class SchemaAlreadyExistsException
-        extends TrinoException
-{
-    private final String schemaName;
-
-    public SchemaAlreadyExistsException(String schemaName)
-    {
-        this(schemaName, format("Schema already exists: '%s'", schemaName));
-    }
-
-    public SchemaAlreadyExistsException(String schemaName, String message)
-    {
-        super(ALREADY_EXISTS, message);
-        this.schemaName = schemaName;
-    }
-
-    public String getSchemaName()
-    {
-        return schemaName;
-    }
-}
diff --git 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
index 076001d11ee..c66f08b3a4f 100644
--- 
a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
+++ 
b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
@@ -39,7 +39,9 @@ import static 
com.google.common.collect.ImmutableList.toImmutableList;
 import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA;
 import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
 import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
+import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS;
 import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
+import static java.lang.String.format;
 
 public class SchemaEmulationByTableNameConvention
         implements SchemaEmulation
@@ -58,7 +60,7 @@ public class SchemaEmulationByTableNameConvention
     public void createSchema(KuduClientWrapper client, String schemaName)
     {
         if (DEFAULT_SCHEMA.equals(schemaName)) {
-            throw new SchemaAlreadyExistsException(schemaName);
+            throw new TrinoException(SCHEMA_ALREADY_EXISTS, format("Schema 
already exists: '%s'", schemaName));
         }
         try (KuduOperationApplier operationApplier = 
KuduOperationApplier.fromKuduClientWrapper(client)) {
             KuduTable schemasTable = getSchemasTable(client);
diff --git 
a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
 
b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
index 2573db30616..9a111f38334 100644
--- 
a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
+++ 
b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java
@@ -33,11 +33,6 @@ public final class KeyEncoderAccessor
         return KeyEncoder.decodePrimaryKey(schema, key);
     }
 
-    public static byte[] encodeRangePartitionKey(PartialRow row, 
PartitionSchema.RangeSchema rangeSchema)
-    {
-        return KeyEncoder.encodeRangePartitionKey(row, rangeSchema);
-    }
-
     public static PartialRow decodeRangePartitionKey(Schema schema, 
PartitionSchema partitionSchema, byte[] key)
     {
         return KeyEncoder.decodeRangePartitionKey(schema, partitionSchema, 
key);
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
index 847a818e3ca..999b4d50642 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java
@@ -20,7 +20,6 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Optional;
 
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch;
 import static io.trino.plugin.kudu.TestKuduConnectorTest.REGION_COLUMNS;
 import static 
io.trino.plugin.kudu.TestKuduConnectorTest.createKuduTableForWrites;
 import static io.trino.plugin.kudu.TestingKuduServer.EARLIEST_TAG;
@@ -40,9 +39,10 @@ public abstract class BaseKuduConnectorSmokeTest
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunnerTpch(
-                closeAfterClass(new TestingKuduServer(getKuduServerVersion())),
-                getKuduSchemaEmulationPrefix(), REQUIRED_TPCH_TABLES);
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer(getKuduServerVersion())))
+                .setKuduSchemaEmulationPrefix(getKuduSchemaEmulationPrefix())
+                .setInitialTables(REQUIRED_TPCH_TABLES)
+                .build();
     }
 
     @Override
@@ -50,16 +50,16 @@ public abstract class BaseKuduConnectorSmokeTest
     {
         return switch (connectorBehavior) {
             case SUPPORTS_ARRAY,
-                    SUPPORTS_COMMENT_ON_COLUMN,
-                    SUPPORTS_COMMENT_ON_TABLE,
-                    SUPPORTS_CREATE_MATERIALIZED_VIEW,
-                    SUPPORTS_CREATE_VIEW,
-                    SUPPORTS_NEGATIVE_DATE,
-                    SUPPORTS_NOT_NULL_CONSTRAINT,
-                    SUPPORTS_RENAME_SCHEMA,
-                    SUPPORTS_ROW_TYPE,
-                    SUPPORTS_TOPN_PUSHDOWN,
-                    SUPPORTS_TRUNCATE -> false;
+                 SUPPORTS_COMMENT_ON_COLUMN,
+                 SUPPORTS_COMMENT_ON_TABLE,
+                 SUPPORTS_CREATE_MATERIALIZED_VIEW,
+                 SUPPORTS_CREATE_VIEW,
+                 SUPPORTS_NEGATIVE_DATE,
+                 SUPPORTS_NOT_NULL_CONSTRAINT,
+                 SUPPORTS_RENAME_SCHEMA,
+                 SUPPORTS_ROW_TYPE,
+                 SUPPORTS_TOPN_PUSHDOWN,
+                 SUPPORTS_TRUNCATE -> false;
             default -> super.hasBehavior(connectorBehavior);
         };
     }
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java
new file mode 100644
index 00000000000..5645162cd9c
--- /dev/null
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kudu;
+
+import io.trino.testing.datatype.ColumnSetup;
+import io.trino.testing.datatype.CreateAndInsertDataSetup;
+import io.trino.testing.sql.SqlExecutor;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static java.lang.String.format;
+import static java.util.stream.Collectors.joining;
+
+public class KuduCreateAndInsertDataSetup
+        extends CreateAndInsertDataSetup
+{
+    public KuduCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String 
tableNamePrefix)
+    {
+        super(sqlExecutor, tableNamePrefix);
+    }
+
+    @Override
+    protected String tableDefinition(List<ColumnSetup> inputs)
+    {
+        return IntStream.range(0, inputs.size())
+                .mapToObj(column -> {
+                    ColumnSetup input = inputs.get(column);
+                    if (input.getDeclaredType().isEmpty()) {
+                        return format("%s AS col_%d", input.getInputLiteral(), 
column);
+                    }
+
+                    return format("CAST(%s AS %s) AS col_%d", 
input.getInputLiteral(), input.getDeclaredType().get(), column);
+                })
+                .collect(joining(",\n", "AS\nSELECT\n", "\nWHERE 'with no' = 
'data'"));
+    }
+}
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
index 9494b78bca0..d83ec457ec5 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java
@@ -14,157 +14,119 @@
 package io.trino.plugin.kudu;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.net.HostAndPort;
+import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import io.airlift.log.Logger;
 import io.airlift.log.Logging;
-import io.trino.Session;
+import io.trino.plugin.base.util.Closables;
 import io.trino.plugin.tpch.TpchPlugin;
 import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
 import io.trino.tpch.TpchTable;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
-import static io.airlift.testing.Closeables.closeAllSuppress;
-import static io.trino.Session.SessionBuilder;
 import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
 import static io.trino.testing.QueryAssertions.copyTpchTables;
 import static io.trino.testing.TestingSession.testSessionBuilder;
+import static java.util.Objects.requireNonNull;
 
 public final class KuduQueryRunnerFactory
 {
     private KuduQueryRunnerFactory() {}
 
-    public static QueryRunner createKuduQueryRunner(TestingKuduServer 
kuduServer, Session session)
-            throws Exception
+    public static Builder builder(TestingKuduServer kuduServer)
     {
-        QueryRunner runner = null;
-        try {
-            runner = DistributedQueryRunner.builder(session).build();
-
-            installKuduConnector(kuduServer.getMasterAddress(), runner, 
session.getSchema().orElse("kudu_smoke_test"), Optional.of(""));
-
-            return runner;
-        }
-        catch (Throwable e) {
-            closeAllSuppress(e, runner);
-            throw e;
-        }
+        return new Builder(kuduServer);
     }
 
-    public static QueryRunner createKuduQueryRunner(TestingKuduServer 
kuduServer, String kuduSchema)
-            throws Exception
+    public static class Builder
+            extends DistributedQueryRunner.Builder<Builder>
     {
-        QueryRunner runner = null;
-        try {
-            runner = 
DistributedQueryRunner.builder(createSession(kuduSchema)).build();
-
-            installKuduConnector(kuduServer.getMasterAddress(), runner, 
kuduSchema, Optional.of(""));
-
-            return runner;
+        private final TestingKuduServer kuduServer;
+        private final Map<String, String> connectorProperties = new 
HashMap<>();
+        private Optional<String> kuduSchemaEmulationPrefix = Optional.empty();
+        private List<TpchTable<?>> initialTables = ImmutableList.of();
+
+        private Builder(TestingKuduServer kuduServer)
+        {
+            super(testSessionBuilder()
+                    .setCatalog("kudu")
+                    .setSchema("default")
+                    .build());
+            this.kuduServer = requireNonNull(kuduServer, "kuduServer is null");
         }
-        catch (Throwable e) {
-            closeAllSuppress(e, runner);
-            throw e;
-        }
-    }
-
-    public static QueryRunner createKuduQueryRunnerTpch(TestingKuduServer 
kuduServer, Optional<String> kuduSchemaEmulationPrefix, TpchTable<?>... tables)
-            throws Exception
-    {
-        return createKuduQueryRunnerTpch(kuduServer, 
kuduSchemaEmulationPrefix, ImmutableList.copyOf(tables));
-    }
-
-    public static QueryRunner createKuduQueryRunnerTpch(TestingKuduServer 
kuduServer, Optional<String> kuduSchemaEmulationPrefix, Iterable<TpchTable<?>> 
tables)
-            throws Exception
-    {
-        return createKuduQueryRunnerTpch(kuduServer, 
kuduSchemaEmulationPrefix, ImmutableMap.of(), ImmutableMap.of(), tables);
-    }
-
-    public static QueryRunner createKuduQueryRunnerTpch(
-            TestingKuduServer kuduServer,
-            Optional<String> kuduSchemaEmulationPrefix,
-            Map<String, String> kuduSessionProperties,
-            Map<String, String> extraProperties,
-            Iterable<TpchTable<?>> tables)
-            throws Exception
-    {
-        DistributedQueryRunner runner = null;
-        try {
-            String kuduSchema = kuduSchemaEmulationPrefix.isPresent() ? "tpch" 
: "default";
-            Session session = createSession(kuduSchema, kuduSessionProperties);
-            runner = DistributedQueryRunner.builder(session)
-                    .setExtraProperties(extraProperties)
-                    .build();
-
-            runner.installPlugin(new TpchPlugin());
-            runner.createCatalog("tpch", "tpch");
-
-            installKuduConnector(kuduServer.getMasterAddress(), runner, 
kuduSchema, kuduSchemaEmulationPrefix);
-
-            copyTpchTables(runner, "tpch", TINY_SCHEMA_NAME, session, tables);
 
-            return runner;
+        @CanIgnoreReturnValue
+        public Builder setKuduSchemaEmulationPrefix(Optional<String> 
kuduSchemaEmulationPrefix)
+        {
+            this.kuduSchemaEmulationPrefix = 
requireNonNull(kuduSchemaEmulationPrefix, "kuduSchemaEmulationPrefix is null");
+            return this;
         }
-        catch (Throwable e) {
-            closeAllSuppress(e, runner);
-            throw e;
-        }
-    }
 
-    private static void installKuduConnector(HostAndPort masterAddress, 
QueryRunner runner, String kuduSchema, Optional<String> 
kuduSchemaEmulationPrefix)
-    {
-        Map<String, String> properties;
-        if (kuduSchemaEmulationPrefix.isPresent()) {
-            properties = ImmutableMap.of(
-                    "kudu.schema-emulation.enabled", "true",
-                    "kudu.schema-emulation.prefix", 
kuduSchemaEmulationPrefix.get(),
-                    "kudu.client.master-addresses", masterAddress.toString());
+        @CanIgnoreReturnValue
+        public Builder addConnectorProperty(String key, String value)
+        {
+            this.connectorProperties.put(key, value);
+            return this;
         }
-        else {
-            properties = ImmutableMap.of(
-                    "kudu.schema-emulation.enabled", "false",
-                    "kudu.client.master-addresses", masterAddress.toString());
-        }
-
-        runner.installPlugin(new KuduPlugin());
-        runner.createCatalog("kudu", "kudu", properties);
 
-        if (kuduSchemaEmulationPrefix.isPresent()) {
-            runner.execute("DROP SCHEMA IF EXISTS " + kuduSchema);
-            runner.execute("CREATE SCHEMA " + kuduSchema);
+        @CanIgnoreReturnValue
+        public Builder setInitialTables(List<TpchTable<?>> initialTables)
+        {
+            this.initialTables = ImmutableList.copyOf(initialTables);
+            return this;
         }
-    }
 
-    public static Session createSession(String schema, Map<String, String> 
kuduSessionProperties)
-    {
-        SessionBuilder builder = testSessionBuilder()
-                .setCatalog("kudu")
-                .setSchema(schema);
-        kuduSessionProperties.forEach((k, v) -> 
builder.setCatalogSessionProperty("kudu", k, v));
-        return builder.build();
-    }
-
-    public static Session createSession(String schema)
-    {
-        return testSessionBuilder()
-                .setCatalog("kudu")
-                .setSchema(schema)
-                .build();
+        @Override
+        public DistributedQueryRunner build()
+                throws Exception
+        {
+            String kuduSchema = kuduSchemaEmulationPrefix.isPresent() ? "tpch" 
: "default";
+            amendSession(sessionBuilder -> 
sessionBuilder.setSchema(kuduSchema));
+            DistributedQueryRunner queryRunner = super.build();
+            try {
+                queryRunner.installPlugin(new TpchPlugin());
+                queryRunner.createCatalog("tpch", "tpch");
+
+                if (kuduSchemaEmulationPrefix.isPresent()) {
+                    addConnectorProperty("kudu.schema-emulation.enabled", 
"true");
+                    addConnectorProperty("kudu.schema-emulation.prefix", 
kuduSchemaEmulationPrefix.get());
+                    addConnectorProperty("kudu.client.master-addresses", 
kuduServer.getMasterAddress().toString());
+                }
+                else {
+                    addConnectorProperty("kudu.schema-emulation.enabled", 
"false");
+                    addConnectorProperty("kudu.client.master-addresses", 
kuduServer.getMasterAddress().toString());
+                }
+
+                queryRunner.installPlugin(new KuduPlugin());
+                queryRunner.createCatalog("kudu", "kudu", connectorProperties);
+
+                if (kuduSchemaEmulationPrefix.isPresent()) {
+                    queryRunner.execute("DROP SCHEMA IF EXISTS " + kuduSchema);
+                    queryRunner.execute("CREATE SCHEMA " + kuduSchema);
+                }
+
+                copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, 
queryRunner.getDefaultSession(), initialTables);
+                return queryRunner;
+            }
+            catch (Throwable e) {
+                Closables.closeAllSuppress(e, queryRunner);
+                throw e;
+            }
+        }
     }
 
     public static void main(String[] args)
             throws Exception
     {
         Logging.initialize();
-        DistributedQueryRunner queryRunner = (DistributedQueryRunner) 
createKuduQueryRunnerTpch(
-                new TestingKuduServer(),
-                Optional.empty(),
-                ImmutableMap.of(),
-                ImmutableMap.of("http-server.http.port", "8080"),
-                TpchTable.getTables());
+        DistributedQueryRunner queryRunner = builder(new TestingKuduServer())
+                .addCoordinatorProperty("http-server.http.port", "8080")
+                .setInitialTables(TpchTable.getTables())
+                .build();
+
         Logger log = Logger.get(KuduQueryRunnerFactory.class);
         log.info("======== SERVER STARTED ========");
         log.info("\n====\n%s\n====", 
queryRunner.getCoordinator().getBaseUrl());
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java
new file mode 100644
index 00000000000..2bcf6aec80e
--- /dev/null
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kudu;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public final class KuduTabletWaitStrategy
+        extends AbstractWaitStrategy
+{
+    private final GenericContainer<?> master;
+
+    public KuduTabletWaitStrategy(GenericContainer<?> master)
+    {
+        this.master = requireNonNull(master, "master is null");
+    }
+
+    @Override
+    protected void waitUntilReady()
+    {
+        Failsafe.with(RetryPolicy.builder()
+                        .withMaxDuration(startupTimeout)
+                        .withMaxAttempts(Integer.MAX_VALUE) // limited by 
MaxDuration
+                        .abortOn(e -> getExitCode().isPresent())
+                        .build())
+                .run(() -> {
+                    // Note: This condition requires a dependency on 
org.rnorth.duct-tape:duct-tape
+                    if (!getRateLimiter().getWhenReady(() -> 
master.getLogs().contains("Registered new tserver with Master"))) {
+                        // We say "timed out" immediately. Failsafe will 
propagate this only when timeout reached.
+                        throw new ContainerLaunchException("Timed out waiting 
for container to register tserver");
+                    }
+                });
+    }
+
+    private Optional<Long> getExitCode()
+    {
+        if (waitStrategyTarget.getContainerId() == null) {
+            // Not yet started
+            return Optional.empty();
+        }
+
+        InspectContainerResponse currentContainerInfo = 
waitStrategyTarget.getCurrentContainerInfo();
+        if (currentContainerInfo.getState().getStartedAt() == null) {
+            // not yet running
+            return Optional.empty();
+        }
+        // currentContainerInfo.getState().getExitCode() is present (0) even 
in "running" state
+        if (Boolean.TRUE.equals(currentContainerInfo.getState().getRunning())) 
{
+            // running
+            return Optional.empty();
+        }
+        return 
Optional.ofNullable(currentContainerInfo.getState().getExitCodeLong());
+    }
+}
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
index 78580ad2e1d..ec75f4a77af 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java
@@ -13,6 +13,7 @@
  */
 package io.trino.plugin.kudu;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import io.airlift.units.Duration;
 import org.junit.jupiter.api.Test;
@@ -31,7 +32,7 @@ public class TestKuduClientConfig
     public void testDefaults()
     {
         assertRecordedDefaults(recordDefaults(KuduClientConfig.class)
-                .setMasterAddresses("")
+                .setMasterAddresses(ImmutableList.of())
                 .setDefaultAdminOperationTimeout(new Duration(30, SECONDS))
                 .setDefaultOperationTimeout(new Duration(30, SECONDS))
                 .setDisableStatistics(false)
@@ -45,7 +46,7 @@ public class TestKuduClientConfig
     public void testExplicitPropertyMappingsWithCredentialsKey()
     {
         Map<String, String> properties = ImmutableMap.<String, String>builder()
-                .put("kudu.client.master-addresses", "localhost")
+                .put("kudu.client.master-addresses", "localhost,localhost2")
                 .put("kudu.client.default-admin-operation-timeout", "1m")
                 .put("kudu.client.default-operation-timeout", "5m")
                 .put("kudu.client.disable-statistics", "true")
@@ -56,7 +57,7 @@ public class TestKuduClientConfig
                 .buildOrThrow();
 
         KuduClientConfig expected = new KuduClientConfig()
-                .setMasterAddresses("localhost")
+                .setMasterAddresses(ImmutableList.of("localhost", 
"localhost2"))
                 .setDefaultAdminOperationTimeout(new Duration(1, MINUTES))
                 .setDefaultOperationTimeout(new Duration(5, MINUTES))
                 .setDisableStatistics(true)
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
index 48363c5d2f4..ca85a1a2390 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java
@@ -29,7 +29,6 @@ import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch;
 import static io.trino.spi.type.VarcharType.VARCHAR;
 import static io.trino.testing.MaterializedResult.resultBuilder;
 import static io.trino.testing.TestingNames.randomNameSuffix;
@@ -51,10 +50,9 @@ public class TestKuduConnectorTest
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunnerTpch(
-                closeAfterClass(new TestingKuduServer()),
-                Optional.empty(),
-                REQUIRED_TPCH_TABLES);
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer()))
+                .setInitialTables(REQUIRED_TPCH_TABLES)
+                .build();
     }
 
     @Override
@@ -62,18 +60,18 @@ public class TestKuduConnectorTest
     {
         return switch (connectorBehavior) {
             case SUPPORTS_ARRAY,
-                    SUPPORTS_COMMENT_ON_COLUMN,
-                    SUPPORTS_COMMENT_ON_TABLE,
-                    SUPPORTS_CREATE_MATERIALIZED_VIEW,
-                    SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT,
-                    SUPPORTS_CREATE_VIEW,
-                    SUPPORTS_NEGATIVE_DATE,
-                    SUPPORTS_NOT_NULL_CONSTRAINT,
-                    SUPPORTS_RENAME_SCHEMA,
-                    SUPPORTS_ROW_TYPE,
-                    SUPPORTS_SET_COLUMN_TYPE,
-                    SUPPORTS_TOPN_PUSHDOWN,
-                    SUPPORTS_TRUNCATE -> false;
+                 SUPPORTS_COMMENT_ON_COLUMN,
+                 SUPPORTS_COMMENT_ON_TABLE,
+                 SUPPORTS_CREATE_MATERIALIZED_VIEW,
+                 SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT,
+                 SUPPORTS_CREATE_VIEW,
+                 SUPPORTS_NEGATIVE_DATE,
+                 SUPPORTS_NOT_NULL_CONSTRAINT,
+                 SUPPORTS_RENAME_SCHEMA,
+                 SUPPORTS_ROW_TYPE,
+                 SUPPORTS_SET_COLUMN_TYPE,
+                 SUPPORTS_TOPN_PUSHDOWN,
+                 SUPPORTS_TRUNCATE -> false;
             default -> super.hasBehavior(connectorBehavior);
         };
     }
@@ -175,7 +173,7 @@ public class TestKuduConnectorTest
                 .row("custkey", "bigint", extra, "")
                 .row("orderstatus", "varchar", extra, "")
                 .row("totalprice", "double", extra, "")
-                .row("orderdate", "varchar", extra, "")
+                .row("orderdate", "date", extra, "")
                 .row("orderpriority", "varchar", extra, "")
                 .row("clerk", "varchar", extra, "")
                 .row("shippriority", "integer", extra, "")
@@ -200,7 +198,7 @@ public class TestKuduConnectorTest
                         "   custkey bigint COMMENT '' WITH ( nullable = true 
),\n" +
                         "   orderstatus varchar COMMENT '' WITH ( nullable = 
true ),\n" +
                         "   totalprice double COMMENT '' WITH ( nullable = 
true ),\n" +
-                        "   orderdate varchar COMMENT '' WITH ( nullable = 
true ),\n" +
+                        "   orderdate date COMMENT '' WITH ( nullable = true 
),\n" +
                         "   orderpriority varchar COMMENT '' WITH ( nullable = 
true ),\n" +
                         "   clerk varchar COMMENT '' WITH ( nullable = true 
),\n" +
                         "   shippriority integer COMMENT '' WITH ( nullable = 
true ),\n" +
@@ -537,6 +535,28 @@ public class TestKuduConnectorTest
         }
     }
 
+    @Test
+    public void testAddColumnWithDecimal()
+    {
+        String tableName = "test_add_column_with_decimal" + randomNameSuffix();
+
+        assertUpdate("CREATE TABLE " + tableName + "(" +
+                "id INT WITH (primary_key=true), " +
+                "a_varchar VARCHAR" +
+                ") WITH (" +
+                " partition_by_hash_columns = ARRAY['id'], " +
+                " partition_by_hash_buckets = 2" +
+                ")");
+
+        assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN b_decimal 
decimal(14,5)");
+        assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c_decimal 
decimal(35,5)");
+
+        assertThat(getColumnType(tableName, 
"b_decimal")).isEqualTo("decimal(14,5)");
+        assertThat(getColumnType(tableName, 
"c_decimal")).isEqualTo("decimal(35,5)");
+
+        assertUpdate("DROP TABLE " + tableName);
+    }
+
     @Test
     public void testInsertIntoTableHavingRowUuid()
     {
@@ -603,8 +623,6 @@ public class TestKuduConnectorTest
     public void testInsertNegativeDate()
     {
         // TODO Remove this overriding test once kudu connector can create 
tables with default partitions
-        // TODO Update this test once kudu connector supports DATE type: 
https://github.com/trinodb/trino/issues/11009
-        // DATE type is not supported by Kudu connector
         try (TestTable table = new TestTable(getQueryRunner()::execute, 
"insert_date",
                 "(dt DATE WITH (primary_key=true)) " +
                         "WITH (partition_by_hash_columns = ARRAY['dt'], 
partition_by_hash_buckets = 2)")) {
@@ -615,7 +633,7 @@ public class TestKuduConnectorTest
     @Override
     protected String errorMessageForInsertNegativeDate(String date)
     {
-        return "Insert query has mismatched column types: Table: 
\\[varchar\\], Query: \\[date\\]";
+        return "Date value <-719893>} is out of range 
'0001-01-01':'9999-12-31'";
     }
 
     @Test
@@ -738,22 +756,19 @@ public class TestKuduConnectorTest
         // Map date column type to varchar
         String tableName = "negative_date_" + randomNameSuffix();
 
-        try {
-            assertUpdate(format("CREATE TABLE %s AS SELECT DATE '-0001-01-01' 
AS dt", tableName), 1);
-            assertQuery("SELECT * FROM " + tableName, "VALUES '-0001-01-01'");
-            assertQuery(format("SELECT * FROM %s WHERE dt = '-0001-01-01'", 
tableName), "VALUES '-0001-01-01'");
-        }
-        finally {
-            assertUpdate("DROP TABLE IF EXISTS " + tableName);
-        }
+        abort("TODO: implement the test for Kudu");
     }
 
     @Test
     @Override
+    @SuppressWarnings("deprecation")
     public void testDateYearOfEraPredicate()
     {
-        assertThatThrownBy(super::testDateYearOfEraPredicate)
-                .hasStackTraceContaining("Cannot apply operator: varchar = 
date");
+        // Override because the connector throws an exception instead of an 
empty result when the value is out of supported range
+        assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE 
'1997-09-14'", "VALUES DATE '1997-09-14'");
+        // TODO Replace failure with a TrinoException
+        assertThatThrownBy(() -> query("SELECT * FROM orders WHERE orderdate = 
DATE '-1996-09-14'"))
+                .hasMessageContaining("integer value out of range for Type: 
date column: -1448295");
     }
 
     @Test
@@ -1013,13 +1028,17 @@ public class TestKuduConnectorTest
             return Optional.of(dataMappingTestSetup.asUnsupported());
         }
 
-        if (typeName.equals("date") // date gets stored as varchar
-                || typeName.equals("varbinary") // TODO 
(https://github.com/trinodb/trino/issues/3416)
+        if (typeName.equals("varbinary") // TODO 
(https://github.com/trinodb/trino/issues/3416)
                 || (typeName.startsWith("char") && 
dataMappingTestSetup.getSampleValueLiteral().contains(" "))) { // TODO: 
https://github.com/trinodb/trino/issues/3597
             // TODO this should either work or fail cleanly
             return Optional.empty();
         }
 
+        if (typeName.equals("date") && 
dataMappingTestSetup.getSampleValueLiteral().equals("DATE '1582-10-05'")) {
+            // Kudu connector returns +10 days during julian->gregorian 
switch. The test case exists in TestKuduTypeMapping.testDate().
+            return Optional.empty();
+        }
+
         return Optional.of(dataMappingTestSetup);
     }
 
@@ -1067,6 +1086,12 @@ public class TestKuduConnectorTest
         assertThat(e).hasMessageContaining("invalid column name: identifier");
     }
 
+    @Override
+    protected String errorMessageForCreateTableAsSelectNegativeDate(String 
date)
+    {
+        return ".*Date value <-719893>} is out of range 
'0001-01-01':'9999-12-31'.*";
+    }
+
     private void assertTableProperty(String tableProperties, String key, 
String regexValue)
     {
         assertThat(Pattern.compile(key + "\\s*=\\s*" + regexValue + 
",?\\s+").matcher(tableProperties).find())
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
index c2420d76be8..87c98224ab1 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java
@@ -22,7 +22,6 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
 import static java.lang.String.format;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.offset;
@@ -49,7 +48,7 @@ public class TestKuduIntegrationDecimalColumns
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), 
"decimal");
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer())).build();
     }
 
     @AfterAll
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
index a6b0c6be87c..480c80645c1 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java
@@ -13,7 +13,6 @@
  */
 package io.trino.plugin.kudu;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Ints;
 import io.opentelemetry.api.trace.Span;
@@ -33,7 +32,6 @@ import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
 import io.trino.testing.MaterializedResultWithQueryId;
 import io.trino.testing.QueryRunner;
-import io.trino.tpch.TpchTable;
 import io.trino.transaction.TransactionId;
 import io.trino.transaction.TransactionManager;
 import org.intellij.lang.annotations.Language;
@@ -45,15 +43,17 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
 import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch;
 import static io.trino.spi.connector.Constraint.alwaysTrue;
 import static 
io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
 import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE;
+import static io.trino.tpch.TpchTable.LINE_ITEM;
+import static io.trino.tpch.TpchTable.ORDERS;
+import static io.trino.tpch.TpchTable.PART;
+import static java.util.Objects.requireNonNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class TestKuduIntegrationDynamicFilter
@@ -63,14 +63,13 @@ public class TestKuduIntegrationDynamicFilter
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunnerTpch(
-                closeAfterClass(new TestingKuduServer()),
-                Optional.of(""),
-                ImmutableMap.of("dynamic_filtering_wait_timeout", "1h"),
-                ImmutableMap.of(
-                        
"dynamic-filtering.small.max-distinct-values-per-driver", "100",
-                        "dynamic-filtering.small.range-row-limit-per-driver", 
"100"),
-                TpchTable.getTables());
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer()))
+                .setKuduSchemaEmulationPrefix(Optional.of(""))
+                .addConnectorProperty("kudu.dynamic-filtering.wait-timeout", 
"1h")
+                
.addExtraProperty("dynamic-filtering.small.max-distinct-values-per-driver", 
"100")
+                
.addExtraProperty("dynamic-filtering.small.range-row-limit-per-driver", "100")
+                .setInitialTables(List.of(LINE_ITEM, ORDERS, PART))
+                .build();
     }
 
     @Test
@@ -88,19 +87,32 @@ public class TestKuduIntegrationDynamicFilter
         QualifiedObjectName tableName = new QualifiedObjectName("kudu", 
"tpch", "orders");
         Optional<TableHandle> tableHandle = 
runner.getMetadata().getTableHandle(session, tableName);
         assertThat(tableHandle.isPresent()).isTrue();
-        SplitSource splitSource = runner.getSplitManager()
-                .getSplits(session, Span.getInvalid(), tableHandle.get(), new 
IncompleteDynamicFilter(), alwaysTrue());
-        List<Split> splits = new ArrayList<>();
-        while (!splitSource.isFinished()) {
-            splits.addAll(splitSource.getNextBatch(1000).get().getSplits());
+        CompletableFuture<Void> dynamicFilterBlocked = new 
CompletableFuture<>();
+        try {
+            SplitSource splitSource = runner.getSplitManager()
+                    .getSplits(session, Span.getInvalid(), tableHandle.get(), 
new BlockedDynamicFilter(dynamicFilterBlocked), alwaysTrue());
+            List<Split> splits = new ArrayList<>();
+            while (!splitSource.isFinished()) {
+                
splits.addAll(splitSource.getNextBatch(1000).get().getSplits());
+            }
+            splitSource.close();
+            assertThat(splits.isEmpty()).isFalse();
+        }
+        finally {
+            dynamicFilterBlocked.complete(null);
         }
-        splitSource.close();
-        assertThat(splits.isEmpty()).isFalse();
     }
 
-    private static class IncompleteDynamicFilter
+    private static class BlockedDynamicFilter
             implements DynamicFilter
     {
+        private final CompletableFuture<?> isBlocked;
+
+        public BlockedDynamicFilter(CompletableFuture<?> isBlocked)
+        {
+            this.isBlocked = requireNonNull(isBlocked, "isBlocked is null");
+        }
+
         @Override
         public Set<ColumnHandle> getColumnsCovered()
         {
@@ -110,14 +122,7 @@ public class TestKuduIntegrationDynamicFilter
         @Override
         public CompletableFuture<?> isBlocked()
         {
-            return CompletableFuture.runAsync(() -> {
-                try {
-                    TimeUnit.HOURS.sleep(1);
-                }
-                catch (InterruptedException e) {
-                    throw new IllegalStateException(e);
-                }
-            });
+            return isBlocked;
         }
 
         @Override
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
index 3c0cb1b917a..fde6eeffd8a 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java
@@ -19,7 +19,6 @@ import io.trino.testing.QueryRunner;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.Test;
 
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class TestKuduIntegrationHashPartitioning
@@ -29,7 +28,7 @@ public class TestKuduIntegrationHashPartitioning
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), 
"hash");
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer())).build();
     }
 
     @Test
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
index c20bddf6564..d87be9f3e72 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java
@@ -18,7 +18,6 @@ import io.trino.testing.MaterializedResult;
 import io.trino.testing.QueryRunner;
 import org.junit.jupiter.api.Test;
 
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.fail;
 
@@ -36,7 +35,7 @@ public class TestKuduIntegrationIntegerColumns
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), 
"test_integer");
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer())).build();
     }
 
     @Test
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
index 0e2975b1d0e..6ac167647df 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java
@@ -18,7 +18,6 @@ import io.trino.testing.MaterializedResult;
 import io.trino.testing.QueryRunner;
 import org.junit.jupiter.api.Test;
 
-import static 
io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner;
 import static java.lang.String.join;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -84,7 +83,7 @@ public class TestKuduIntegrationRangePartitioning
     protected QueryRunner createQueryRunner()
             throws Exception
     {
-        return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), 
"range_partitioning");
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer())).build();
     }
 
     @Test
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java
new file mode 100644
index 00000000000..57060be2b20
--- /dev/null
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kudu;
+
+import io.trino.Session;
+import io.trino.spi.type.TimeZoneKey;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingSession;
+import io.trino.testing.datatype.CreateAsSelectDataSetup;
+import io.trino.testing.datatype.DataSetup;
+import io.trino.testing.datatype.SqlDataTypeTest;
+import io.trino.testing.sql.TrinoSqlExecutor;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.function.Function;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DateType.DATE;
+import static io.trino.spi.type.DecimalType.createDecimalType;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TimestampType.createTimestampType;
+import static io.trino.spi.type.TinyintType.TINYINT;
+import static io.trino.spi.type.VarbinaryType.VARBINARY;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.String.format;
+import static java.time.ZoneOffset.UTC;
+
+final class TestKuduTypeMapping
+        extends AbstractTestQueryFramework
+{
+    private final ZoneId jvmZone = ZoneId.systemDefault();
+    private final LocalDateTime timeGapInJvmZone1 = LocalDateTime.of(1970, 1, 
1, 0, 13, 42);
+    private final LocalDateTime timeGapInJvmZone2 = LocalDateTime.of(2018, 4, 
1, 2, 13, 55, 123_000_000);
+    private final LocalDateTime timeDoubledInJvmZone = LocalDateTime.of(2018, 
10, 28, 1, 33, 17, 456_000_000);
+
+    // no DST in 1970, but has DST in later years (e.g. 2018)
+    private final ZoneId vilnius = ZoneId.of("Europe/Vilnius");
+    private final LocalDateTime timeGapInVilnius = LocalDateTime.of(2018, 3, 
25, 3, 17, 17);
+    private final LocalDateTime timeDoubledInVilnius = LocalDateTime.of(2018, 
10, 28, 3, 33, 33, 333_000_000);
+
+    // minutes offset change since 1970-01-01, no DST
+    private final ZoneId kathmandu = ZoneId.of("Asia/Kathmandu");
+    private final LocalDateTime timeGapInKathmandu = LocalDateTime.of(1986, 1, 
1, 0, 13, 7);
+
+    @BeforeAll
+    public void setUp()
+    {
+        checkState(jvmZone.getId().equals("America/Bahia_Banderas"), "This 
test assumes certain JVM time zone");
+        LocalDate dateOfLocalTimeChangeForwardAtMidnightInJvmZone = 
LocalDate.of(1970, 1, 1);
+        checkIsGap(jvmZone, 
dateOfLocalTimeChangeForwardAtMidnightInJvmZone.atStartOfDay());
+        checkIsGap(jvmZone, timeGapInJvmZone1);
+        checkIsGap(jvmZone, timeGapInJvmZone2);
+        checkIsDoubled(jvmZone, timeDoubledInJvmZone);
+
+        LocalDate dateOfLocalTimeChangeForwardAtMidnightInSomeZone = 
LocalDate.of(1983, 4, 1);
+        checkIsGap(vilnius, 
dateOfLocalTimeChangeForwardAtMidnightInSomeZone.atStartOfDay());
+        LocalDate dateOfLocalTimeChangeBackwardAtMidnightInSomeZone = 
LocalDate.of(1983, 10, 1);
+        checkIsDoubled(vilnius, 
dateOfLocalTimeChangeBackwardAtMidnightInSomeZone.atStartOfDay().minusMinutes(1));
+        checkIsGap(vilnius, timeGapInVilnius);
+        checkIsDoubled(vilnius, timeDoubledInVilnius);
+
+        checkIsGap(kathmandu, timeGapInKathmandu);
+    }
+
+    private static void checkIsGap(ZoneId zone, LocalDateTime dateTime)
+    {
+        verify(isGap(zone, dateTime), "Expected %s to be a gap in %s", 
dateTime, zone);
+    }
+
+    private static boolean isGap(ZoneId zone, LocalDateTime dateTime)
+    {
+        return zone.getRules().getValidOffsets(dateTime).isEmpty();
+    }
+
+    private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
+    {
+        verify(zone.getRules().getValidOffsets(dateTime).size() == 2, 
"Expected %s to be doubled in %s", dateTime, zone);
+    }
+
+    @Override
+    protected QueryRunner createQueryRunner()
+            throws Exception
+    {
+        return KuduQueryRunnerFactory.builder(closeAfterClass(new 
TestingKuduServer())).build();
+    }
+
+    @Test
+    void testBoolean()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("boolean", "NULL", BOOLEAN, "CAST(NULL AS 
BOOLEAN)")
+                .addRoundTrip("boolean", "true", BOOLEAN)
+                .addRoundTrip("boolean", "false", BOOLEAN)
+                .execute(getQueryRunner(), trinoCreateAsSelect("test_boolean"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("test_boolean"));
+    }
+
+    @Test
+    void testTinyint()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("tinyint", "NULL", TINYINT, "CAST(NULL AS 
TINYINT)")
+                .addRoundTrip("tinyint", "-128", TINYINT, "TINYINT '-128'")
+                .addRoundTrip("tinyint", "5", TINYINT, "TINYINT '5'")
+                .addRoundTrip("tinyint", "127", TINYINT, "TINYINT '127'")
+                .execute(getQueryRunner(), trinoCreateAsSelect("test_tinyint"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("test_tinyint"));
+    }
+
+    @Test
+    void testSmallint()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("smallint", "NULL", SMALLINT, "CAST(NULL AS 
SMALLINT)")
+                .addRoundTrip("smallint", "-32768", SMALLINT, "SMALLINT 
'-32768'")
+                .addRoundTrip("smallint", "32456", SMALLINT, "SMALLINT 
'32456'")
+                .addRoundTrip("smallint", "32767", SMALLINT, "SMALLINT 
'32767'")
+                .execute(getQueryRunner(), 
trinoCreateAsSelect("test_smallint"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("test_smallint"));
+    }
+
+    @Test
+    void testInt()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("int", "NULL", INTEGER, "CAST(NULL AS INTEGER)")
+                .addRoundTrip("int", "-2147483648", INTEGER, "-2147483648")
+                .addRoundTrip("int", "1234567890", INTEGER, "1234567890")
+                .addRoundTrip("int", "2147483647", INTEGER, "2147483647")
+                .execute(getQueryRunner(), trinoCreateAsSelect("test_int"))
+                .execute(getQueryRunner(), trinoCreateAndInsert("test_int"));
+    }
+
+    @Test
+    void testBigint()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("bigint", "NULL", BIGINT, "CAST(NULL AS BIGINT)")
+                .addRoundTrip("bigint", "-9223372036854775808", BIGINT, 
"-9223372036854775808")
+                .addRoundTrip("bigint", "123456789012", BIGINT, "123456789012")
+                .addRoundTrip("bigint", "9223372036854775807", BIGINT, 
"9223372036854775807")
+                .execute(getQueryRunner(), trinoCreateAsSelect("test_bigint"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("test_bigint"));
+    }
+
+    @Test
+    void testReal()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("real", "NULL", REAL, "CAST(NULL AS REAL)")
+                .addRoundTrip("real", "12.5", REAL, "REAL '12.5'")
+                .addRoundTrip("real", "nan()", REAL, "CAST(nan() AS REAL)")
+                .addRoundTrip("real", "-infinity()", REAL, "CAST(-infinity() 
AS REAL)")
+                .addRoundTrip("real", "+infinity()", REAL, "CAST(+infinity() 
AS REAL)")
+                .execute(getQueryRunner(), trinoCreateAsSelect("test_real"))
+                .execute(getQueryRunner(), trinoCreateAndInsert("test_real"));
+    }
+
+    @Test
+    void testDouble()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("double", "NULL", DOUBLE, "CAST(NULL AS DOUBLE)")
+                .addRoundTrip("double", "3.1415926835", DOUBLE, "DOUBLE 
'3.1415926835'")
+                .addRoundTrip("double", "1.79769E308", DOUBLE, "DOUBLE 
'1.79769E308'")
+                .addRoundTrip("double", "2.225E-307", DOUBLE, "DOUBLE 
'2.225E-307'")
+                .execute(getQueryRunner(), 
trinoCreateAsSelect("trino_test_double"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("trino_test_double"));
+    }
+
+    @Test
+    void testDecimal()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("decimal(3, 0)", "CAST(NULL AS decimal(3, 0))", 
createDecimalType(3, 0), "CAST(NULL AS decimal(3, 0))")
+                .addRoundTrip("decimal(3, 0)", "CAST('193' AS decimal(3, 0))", 
createDecimalType(3, 0), "CAST('193' AS decimal(3, 0))")
+                .addRoundTrip("decimal(3, 0)", "CAST('19' AS decimal(3, 0))", 
createDecimalType(3, 0), "CAST('19' AS decimal(3, 0))")
+                .addRoundTrip("decimal(3, 0)", "CAST('-193' AS decimal(3, 
0))", createDecimalType(3, 0), "CAST('-193' AS decimal(3, 0))")
+                .addRoundTrip("decimal(3, 1)", "CAST('10.0' AS decimal(3, 
1))", createDecimalType(3, 1), "CAST('10.0' AS decimal(3, 1))")
+                .addRoundTrip("decimal(3, 1)", "CAST('10.1' AS decimal(3, 
1))", createDecimalType(3, 1), "CAST('10.1' AS decimal(3, 1))")
+                .addRoundTrip("decimal(3, 1)", "CAST('-10.1' AS decimal(3, 
1))", createDecimalType(3, 1), "CAST('-10.1' AS decimal(3, 1))")
+                .addRoundTrip("decimal(4, 2)", "CAST('2' AS decimal(4, 2))", 
createDecimalType(4, 2), "CAST('2' AS decimal(4, 2))")
+                .addRoundTrip("decimal(4, 2)", "CAST('2.3' AS decimal(4, 2))", 
createDecimalType(4, 2), "CAST('2.3' AS decimal(4, 2))")
+                .addRoundTrip("decimal(24, 2)", "CAST('2' AS decimal(24, 2))", 
createDecimalType(24, 2), "CAST('2' AS decimal(24, 2))")
+                .addRoundTrip("decimal(24, 2)", "CAST('2.3' AS decimal(24, 
2))", createDecimalType(24, 2), "CAST('2.3' AS decimal(24, 2))")
+                .addRoundTrip("decimal(24, 2)", "CAST('123456789.3' AS 
decimal(24, 2))", createDecimalType(24, 2), "CAST('123456789.3' AS decimal(24, 
2))")
+                .addRoundTrip("decimal(24, 4)", 
"CAST('12345678901234567890.31' AS decimal(24, 4))", createDecimalType(24, 4), 
"CAST('12345678901234567890.31' AS decimal(24, 4))")
+                .addRoundTrip("decimal(30, 5)", 
"CAST('3141592653589793238462643.38327' AS decimal(30, 5))", 
createDecimalType(30, 5), "CAST('3141592653589793238462643.38327' AS 
decimal(30, 5))")
+                .addRoundTrip("decimal(30, 5)", 
"CAST('-3141592653589793238462643.38327' AS decimal(30, 5))", 
createDecimalType(30, 5), "CAST('-3141592653589793238462643.38327' AS 
decimal(30, 5))")
+                .addRoundTrip("decimal(38, 0)", "CAST(NULL AS decimal(38, 
0))", createDecimalType(38, 0), "CAST(NULL AS decimal(38, 0))")
+                .addRoundTrip("decimal(38, 0)", 
"CAST('27182818284590452353602874713526624977' AS decimal(38, 0))", 
createDecimalType(38, 0), "CAST('27182818284590452353602874713526624977' AS 
decimal(38, 0))")
+                .addRoundTrip("decimal(38, 0)", 
"CAST('-27182818284590452353602874713526624977' AS decimal(38, 0))", 
createDecimalType(38, 0), "CAST('-27182818284590452353602874713526624977' AS 
decimal(38, 0))")
+                .addRoundTrip("decimal(38, 38)", 
"CAST('0.27182818284590452353602874713526624977' AS decimal(38, 38))", 
createDecimalType(38, 38), "CAST('0.27182818284590452353602874713526624977' AS 
decimal(38, 38))")
+                .execute(getQueryRunner(), 
trinoCreateAsSelect("trino_test_decimal"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("trino_test_decimal"));
+    }
+
+    @Test
+    void testVarchar()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("varchar", "NULL", VARCHAR, "CAST(NULL AS 
varchar)")
+                .addRoundTrip("varchar", "'text_a'", VARCHAR, "CAST('text_a' 
AS varchar)")
+                .addRoundTrip("varchar", "'text_b'", VARCHAR, "CAST('text_b' 
AS varchar)")
+                .addRoundTrip("varchar", "'攻殻機動隊'", VARCHAR, "CAST('攻殻機動隊' AS 
varchar)")
+                .addRoundTrip("varchar", "'攻殻機動隊'", VARCHAR, "CAST('攻殻機動隊' AS 
varchar)")
+                .addRoundTrip("varchar", "'😂'", VARCHAR, "CAST('😂' AS 
varchar)")
+                .addRoundTrip("varchar", "'Ну, погоди!'", VARCHAR, "CAST('Ну, 
погоди!' AS varchar)")
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("test_varchar"))
+                .execute(getQueryRunner(), 
trinoCreateAsSelect("test_varchar"));
+    }
+
+    @Test
+    void testVarbinary()
+    {
+        SqlDataTypeTest.create()
+                .addRoundTrip("varbinary", "NULL", VARBINARY, "CAST(NULL AS 
varbinary)")
+                .addRoundTrip("varbinary", "X''", VARBINARY, "X''")
+                .addRoundTrip("varbinary", "X'68656C6C6F'", VARBINARY, 
"to_utf8('hello')")
+                .addRoundTrip("varbinary", 
"X'5069C4996B6E6120C582C4856B61207720E69DB1E4BAACE983BD'", VARBINARY, 
"to_utf8('Piękna łąka w 東京都')")
+                .addRoundTrip("varbinary", 
"X'4261672066756C6C206F6620F09F92B0'", VARBINARY, "to_utf8('Bag full of 💰')")
+                .addRoundTrip("varbinary", 
"X'0001020304050607080DF9367AA7000000'", VARBINARY, 
"X'0001020304050607080DF9367AA7000000'") // non-text
+                .addRoundTrip("varbinary", "X'000000000000'", VARBINARY, 
"X'000000000000'")
+                .execute(getQueryRunner(), 
trinoCreateAsSelect("test_varbinary"))
+                .execute(getQueryRunner(), 
trinoCreateAndInsert("test_varbinary"));
+    }
+
+    @Test
+    void testDate()
+    {
+        testDate(UTC);
+        testDate(jvmZone);
+        // using two non-JVM zones
+        testDate(vilnius);
+        testDate(kathmandu);
+        testDate(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
+    }
+
+    private void testDate(ZoneId sessionZone)
+    {
+        Session session = Session.builder(getSession())
+                
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId()))
+                .build();
+
+        dateTest(inputLiteral -> format("DATE %s", inputLiteral))
+                .execute(getQueryRunner(), session, 
trinoCreateAsSelect(session, "test_date"))
+                .execute(getQueryRunner(), session, 
trinoCreateAsSelect("test_date"))
+                .execute(getQueryRunner(), session, 
trinoCreateAndInsert(session, "test_date"))
+                .execute(getQueryRunner(), session, 
trinoCreateAndInsert("test_date"));
+    }
+
+    private static SqlDataTypeTest dateTest(Function<String, String> 
inputLiteralFactory)
+    {
+        return SqlDataTypeTest.create()
+                .addRoundTrip("date", "NULL", DATE, "CAST(NULL AS DATE)")
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'0001-01-01'"), DATE, "DATE '0001-01-01'") // mon 
value in Kudu
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1582-10-04'"), DATE, "DATE '1582-10-04'") // before 
julian->gregorian switch
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1582-10-05'"), DATE, "DATE '1582-10-15'") // begin 
julian->gregorian switch
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1582-10-14'"), DATE, "DATE '1582-10-24'") // end 
julian->gregorian switch
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1952-04-03'"), DATE, "DATE '1952-04-03'") // before 
epoch
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1970-01-01'"), DATE, "DATE '1970-01-01'")
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1970-02-03'"), DATE, "DATE '1970-02-03'")
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1983-04-01'"), DATE, "DATE '1983-04-01'")
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'1983-10-01'"), DATE, "DATE '1983-10-01'")
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'2017-07-01'"), DATE, "DATE '2017-07-01'") // summer 
on northern hemisphere (possible DST)
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'2017-01-01'"), DATE, "DATE '2017-01-01'") // winter 
on northern hemisphere (possible DST on southern hemisphere)
+                .addRoundTrip("date", 
inputLiteralFactory.apply("'9999-12-31'"), DATE, "DATE '9999-12-31'"); // max 
value in Kudu
+    }
+
+    @Test
+    void testTimestamp()
+    {
+        testTimestamp(UTC);
+        testTimestamp(jvmZone);
+        // using two non-JVM zones
+        testTimestamp(vilnius);
+        testTimestamp(kathmandu);
+        testTimestamp(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId());
+    }
+
+    private void testTimestamp(ZoneId sessionZone)
+    {
+        Session session = Session.builder(getSession())
+                
.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId()))
+                .build();
+
+        SqlDataTypeTest.create()
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '1958-01-01 
13:18:03.123'", createTimestampType(3), "TIMESTAMP '1958-01-01 13:18:03.123'")
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '2019-03-18 
10:01:17.987'", createTimestampType(3), "TIMESTAMP '2019-03-18 10:01:17.987'")
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-10-28 
01:33:17.456'", createTimestampType(3), "TIMESTAMP '2018-10-28 01:33:17.456'")
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-10-28 
03:33:33.333'", createTimestampType(3), "TIMESTAMP '2018-10-28 03:33:33.333'")
+
+                // epoch also is a gap in JVM zone
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '1970-01-01 
00:00:00.000'", createTimestampType(3), "TIMESTAMP '1970-01-01 00:00:00.000'")
+
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '1970-01-01 
00:13:42.000'", createTimestampType(3), "TIMESTAMP '1970-01-01 00:13:42.000'")
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-04-01 
02:13:55.123'", createTimestampType(3), "TIMESTAMP '2018-04-01 02:13:55.123'")
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-03-25 
03:17:17.000'", createTimestampType(3), "TIMESTAMP '2018-03-25 03:17:17.000'")
+                .addRoundTrip("timestamp(3)", "TIMESTAMP '1986-01-01 
00:13:07.000'", createTimestampType(3), "TIMESTAMP '1986-01-01 00:13:07.000'")
+                .execute(getQueryRunner(), session, 
trinoCreateAsSelect(session, "test_timestamp"))
+                .execute(getQueryRunner(), session, 
trinoCreateAsSelect("test_timestamp"))
+                .execute(getQueryRunner(), session, 
trinoCreateAndInsert(session, "test_timestamp"))
+                .execute(getQueryRunner(), session, 
trinoCreateAndInsert("test_timestamp"));
+    }
+
+    private DataSetup trinoCreateAsSelect(String tableNamePrefix)
+    {
+        return trinoCreateAsSelect(getSession(), tableNamePrefix);
+    }
+
+    private DataSetup trinoCreateAsSelect(Session session, String 
tableNamePrefix)
+    {
+        return new CreateAsSelectDataSetup(new 
TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix);
+    }
+
+    private DataSetup trinoCreateAndInsert(String tableNamePrefix)
+    {
+        return trinoCreateAndInsert(getSession(), tableNamePrefix);
+    }
+
+    private DataSetup trinoCreateAndInsert(Session session, String 
tableNamePrefix)
+    {
+        return new KuduCreateAndInsertDataSetup(new 
TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix);
+    }
+}
diff --git 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
index 0cee1c087bf..c14584fdbc0 100644
--- 
a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
+++ 
b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
@@ -32,7 +32,7 @@ public class TestingKuduServer
 {
     private static final String KUDU_IMAGE = "apache/kudu";
     public static final String EARLIEST_TAG = "1.13.0";
-    public static final String LATEST_TAG = "1.15.0";
+    public static final String LATEST_TAG = "1.17";
 
     private static final Integer KUDU_MASTER_PORT = 7051;
     private static final Integer KUDU_TSERVER_PORT = 7050;
@@ -61,14 +61,11 @@ public class TestingKuduServer
     public TestingKuduServer(String kuduVersion)
     {
         network = Network.newNetwork();
-
-        String hostIP = getHostIPAddress();
-
         String masterContainerAlias = "kudu-master";
         this.master = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, 
kuduVersion))
                 .withExposedPorts(KUDU_MASTER_PORT)
                 .withCommand("master")
-                .withEnv("MASTER_ARGS", "--default_num_replicas=1")
+                .withEnv("MASTER_ARGS", "--default_num_replicas=1 
--unlock_unsafe_flags --use_hybrid_clock=false")
                 .withNetwork(network)
                 .withNetworkAliases(masterContainerAlias);
 
@@ -78,14 +75,16 @@ public class TestingKuduServer
         toxiProxy.start();
 
         String instanceName = "kudu-tserver";
+        @SuppressWarnings("deprecation")
         ToxiproxyContainer.ContainerProxy proxy = 
toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT);
         tabletServer = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, 
kuduVersion))
                 .withExposedPorts(KUDU_TSERVER_PORT)
                 .withCommand("tserver")
                 .withEnv("KUDU_MASTERS", format("%s:%s", masterContainerAlias, 
KUDU_MASTER_PORT))
-                .withEnv("TSERVER_ARGS", 
format("--fs_wal_dir=/var/lib/kudu/tserver --logtostderr 
--use_hybrid_clock=false --rpc_bind_addresses=%s:%s 
--rpc_advertised_addresses=%s:%s", instanceName, KUDU_TSERVER_PORT, hostIP, 
proxy.getProxyPort()))
+                .withEnv("TSERVER_ARGS", 
format("--fs_wal_dir=/var/lib/kudu/tserver --logtostderr 
--use_hybrid_clock=false --unlock_unsafe_flags --rpc_bind_addresses=%s:%s 
--rpc_advertised_addresses=%s:%s", instanceName, KUDU_TSERVER_PORT, 
TOXIPROXY_NETWORK_ALIAS, proxy.getOriginalProxyPort()))
                 .withNetwork(network)
                 .withNetworkAliases(instanceName)
+                .waitingFor(new KuduTabletWaitStrategy(master))
                 .dependsOn(master);
 
         master.start();
diff --git 
a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java 
b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java
index dd00a554460..52de3e7ecce 100644
--- 
a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java
+++ 
b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java
@@ -677,13 +677,13 @@ public abstract class BaseConnectorTest
     @Test
     public void testSelectVersionOfNonExistentTable()
     {
+        String tableName = "foo_" + randomNameSuffix();
         String catalog = getSession().getCatalog().orElseThrow();
         String schema = getSession().getSchema().orElseThrow();
-        String tableName = "foo_" + randomNameSuffix();
         assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " FOR 
TIMESTAMP AS OF TIMESTAMP '2021-03-01 00:00:01'"))
-                .hasMessage(format("line 1:15: Table '%s.%s.%s' does not 
exist", catalog, schema, tableName));
+                .hasMessageMatching("line 1:15: Table '%s.%s.%s' does not 
exist|This connector does not support versioned tables".formatted(catalog, 
schema, tableName));
         assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " FOR 
VERSION AS OF 'version1'"))
-                .hasMessage(format("line 1:15: Table '%s.%s.%s' does not 
exist", catalog, schema, tableName));
+                .hasMessageMatching("line 1:15: Table '%s.%s.%s' does not 
exist|This connector does not support versioned tables".formatted(catalog, 
schema, tableName));
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to