This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch trino-435 in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/trino-435 by this push: new 314ac78c739 [kudu] pick kudu connector to 452 version. (#232) 314ac78c739 is described below commit 314ac78c739279c0a9d69c49d044cf0cb156206d Author: daidai <2017501...@qq.com> AuthorDate: Tue Jul 23 10:14:01 2024 +0800 [kudu] pick kudu connector to 452 version. (#232) Migrate the changes of trino Kudu connector from version 435 to version 452. Changed some code to adapt to the changes of Trino Spi. Performed tpch tests on both Trino and Doris. It is recommended to compile with mvn package -Dmaven.test.skip=true. --- plugin/trino-kudu/pom.xml | 34 +- .../io/trino/plugin/kudu/KuduClientConfig.java | 17 +- .../io/trino/plugin/kudu/KuduClientSession.java | 81 ++--- .../io/trino/plugin/kudu/KuduColumnHandle.java | 76 +---- .../java/io/trino/plugin/kudu/KuduMetadata.java | 74 +++-- .../java/io/trino/plugin/kudu/KuduPageSink.java | 28 +- .../java/io/trino/plugin/kudu/KuduRecordSet.java | 4 +- .../io/trino/plugin/kudu/KuduSplitManager.java | 101 +++--- .../java/io/trino/plugin/kudu/KuduTableHandle.java | 2 +- .../main/java/io/trino/plugin/kudu/TypeHelper.java | 171 +++++----- .../kudu/properties/HashPartitionDefinition.java | 26 +- .../kudu/properties/KuduTableProperties.java | 142 +++------ .../plugin/kudu/properties/PartitionDesign.java | 4 +- .../kudu/properties/RangeBoundValueSerializer.java | 2 +- .../plugin/kudu/properties/RangePartition.java | 28 +- .../kudu/properties/RangePartitionDefinition.java | 17 +- .../plugin/kudu/schema/NoSchemaEmulation.java | 4 +- .../kudu/schema/SchemaAlreadyExistsException.java | 41 --- .../SchemaEmulationByTableNameConvention.java | 4 +- .../org/apache/kudu/client/KeyEncoderAccessor.java | 5 - .../plugin/kudu/BaseKuduConnectorSmokeTest.java | 28 +- .../plugin/kudu/KuduCreateAndInsertDataSetup.java | 48 +++ .../trino/plugin/kudu/KuduQueryRunnerFactory.java | 194 +++++------- .../trino/plugin/kudu/KuduTabletWaitStrategy.java | 73 +++++ .../io/trino/plugin/kudu/TestKuduClientConfig.java | 7 +- .../trino/plugin/kudu/TestKuduConnectorTest.java | 93 ++++-- .../kudu/TestKuduIntegrationDecimalColumns.java | 3 +- .../kudu/TestKuduIntegrationDynamicFilter.java | 61 ++-- .../kudu/TestKuduIntegrationHashPartitioning.java | 3 +- .../kudu/TestKuduIntegrationIntegerColumns.java | 3 +- .../kudu/TestKuduIntegrationRangePartitioning.java | 3 +- .../io/trino/plugin/kudu/TestKuduTypeMapping.java | 347 +++++++++++++++++++++ .../io/trino/plugin/kudu/TestingKuduServer.java | 11 +- .../java/io/trino/testing/BaseConnectorTest.java | 6 +- 34 files changed, 1007 insertions(+), 734 deletions(-) diff --git a/plugin/trino-kudu/pom.xml b/plugin/trino-kudu/pom.xml index a27bbd0bc26..9878e885ff6 100644 --- a/plugin/trino-kudu/pom.xml +++ b/plugin/trino-kudu/pom.xml @@ -10,11 +10,12 @@ <artifactId>trino-kudu</artifactId> <packaging>trino-plugin</packaging> - <description>Trino - Kudu Connector</description> + <description>Trino - Kudu connector</description> <properties> <air.main.basedir>${project.parent.basedir}</air.main.basedir> - <kudu.version>1.15.0</kudu.version> + <air.compiler.fail-warnings>true</air.compiler.fail-warnings> + <kudu.version>1.17.0</kudu.version> </properties> <dependencies> @@ -131,12 +132,30 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>com.google.errorprone</groupId> + <artifactId>error_prone_annotations</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> <groupId>io.airlift</groupId> <artifactId>log-manager</artifactId> <scope>runtime</scope> </dependency> + <dependency> + <groupId>com.github.docker-java</groupId> + <artifactId>docker-java-api</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>dev.failsafe</groupId> + <artifactId>failsafe</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>io.airlift</groupId> <artifactId>junit-extensions</artifactId> @@ -218,20 +237,21 @@ </dependency> <dependency> - <groupId>org.testcontainers</groupId> - <artifactId>testcontainers</artifactId> + <groupId>org.rnorth.duct-tape</groupId> + <artifactId>duct-tape</artifactId> + <version>1.0.8</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testcontainers</groupId> - <artifactId>toxiproxy</artifactId> + <artifactId>testcontainers</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.testng</groupId> - <artifactId>testng</artifactId> + <groupId>org.testcontainers</groupId> + <artifactId>toxiproxy</artifactId> <scope>test</scope> </dependency> </dependencies> diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java index 6d02132f0e7..6a0a281ec8c 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientConfig.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.kudu; -import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; @@ -35,11 +34,11 @@ import static java.util.concurrent.TimeUnit.MINUTES; @DefunctConfig("kudu.client.default-socket-read-timeout") public class KuduClientConfig { - private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); + private static final Duration DEFAULT_OPERATION_TIMEOUT = new Duration(30, TimeUnit.SECONDS); private List<String> masterAddresses = ImmutableList.of(); - private Duration defaultAdminOperationTimeout = new Duration(30, TimeUnit.SECONDS); - private Duration defaultOperationTimeout = new Duration(30, TimeUnit.SECONDS); + private Duration defaultAdminOperationTimeout = DEFAULT_OPERATION_TIMEOUT; + private Duration defaultOperationTimeout = DEFAULT_OPERATION_TIMEOUT; private boolean disableStatistics; private boolean schemaEmulationEnabled; private String schemaEmulationPrefix = "presto::"; @@ -54,15 +53,9 @@ public class KuduClientConfig } @Config("kudu.client.master-addresses") - public KuduClientConfig setMasterAddresses(String commaSeparatedList) + public KuduClientConfig setMasterAddresses(List<String> commaSeparatedList) { - this.masterAddresses = SPLITTER.splitToList(commaSeparatedList); - return this; - } - - public KuduClientConfig setMasterAddresses(String... contactPoints) - { - this.masterAddresses = ImmutableList.copyOf(contactPoints); + this.masterAddresses = commaSeparatedList; return this; } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java index f3b2ae96c8e..eec1ac45dd8 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java @@ -15,6 +15,8 @@ package io.trino.plugin.kudu; import com.google.common.collect.ImmutableList; import io.airlift.log.Logger; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; import io.trino.plugin.kudu.properties.ColumnDesign; import io.trino.plugin.kudu.properties.HashPartitionDefinition; import io.trino.plugin.kudu.properties.KuduTableProperties; @@ -58,6 +60,7 @@ import org.apache.kudu.client.PartitionSchema.HashBucketSchema; import java.io.IOException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -178,7 +181,7 @@ public class KuduClientSession .boxed().collect(toList()); for (ColumnHandle column : desiredColumns.get()) { KuduColumnHandle k = (KuduColumnHandle) column; - int index = k.getOrdinalPosition(); + int index = k.ordinalPosition(); if (index >= primaryKeyColumnCount) { columnIndexes.add(index); } @@ -194,7 +197,7 @@ public class KuduClientSession else { if (desiredColumns.isPresent()) { columnIndexes = desiredColumns.get().stream() - .map(handle -> ((KuduColumnHandle) handle).getOrdinalPosition()) + .map(handle -> ((KuduColumnHandle) handle).ordinalPosition()) .collect(toImmutableList()); } else { @@ -323,12 +326,12 @@ public class KuduClientSession String rawName = schemaEmulation.toRawName(schemaTableName); AlterTableOptions alterOptions = new AlterTableOptions(); Type type = TypeHelper.toKuduClientType(column.getType()); - alterOptions.addColumn( - new ColumnSchemaBuilder(column.getName(), type) - .nullable(true) - .defaultValue(null) - .comment(nullToEmpty(column.getComment())) // Kudu doesn't allow null comment - .build()); + ColumnSchemaBuilder builder = new ColumnSchemaBuilder(column.getName(), type) + .nullable(true) + .defaultValue(null) + .comment(nullToEmpty(column.getComment())); // Kudu doesn't allow null comment + setTypeAttributes(column, builder); + alterOptions.addColumn(builder.build()); client.alterTable(rawName, alterOptions); } catch (KuduException e) { @@ -384,8 +387,8 @@ public class KuduClientSession if (definition == null) { throw new TrinoException(QUERY_REJECTED, "Table " + schemaTableName + " has no range partition"); } - PartialRow lowerBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.getLower()); - PartialRow upperBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.getUpper()); + PartialRow lowerBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.lower()); + PartialRow upperBound = KuduTableProperties.toRangeBoundToPartialRow(schema, definition, rangePartition.upper()); AlterTableOptions alterOptions = new AlterTableOptions(); switch (change) { case ADD: @@ -467,19 +470,19 @@ public class KuduClientSession PartitionDesign partitionDesign = KuduTableProperties.getPartitionDesign(properties); if (partitionDesign.getHash() != null) { for (HashPartitionDefinition partition : partitionDesign.getHash()) { - options.addHashPartitions(partition.getColumns(), partition.getBuckets()); + options.addHashPartitions(partition.columns(), partition.buckets()); } } if (partitionDesign.getRange() != null) { rangePartitionDefinition = partitionDesign.getRange(); - options.setRangePartitionColumns(rangePartitionDefinition.getColumns()); + options.setRangePartitionColumns(rangePartitionDefinition.columns()); } List<RangePartition> rangePartitions = KuduTableProperties.getRangePartitions(properties); if (rangePartitionDefinition != null && !rangePartitions.isEmpty()) { for (RangePartition rangePartition : rangePartitions) { - PartialRow lower = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.getLower()); - PartialRow upper = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.getUpper()); + PartialRow lower = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.lower()); + PartialRow upper = KuduTableProperties.toRangeBoundToPartialRow(schema, rangePartitionDefinition, rangePartition.upper()); options.addRangePartition(lower, upper); } } @@ -503,7 +506,7 @@ public class KuduClientSession Schema schema = table.getSchema(); constraintSummary.getDomains().orElseThrow().forEach((columnHandle, domain) -> { - int position = ((KuduColumnHandle) columnHandle).getOrdinalPosition(); + int position = ((KuduColumnHandle) columnHandle).ordinalPosition(); ColumnSchema columnSchema = schema.getColumnByIndex(position); verify(!domain.isNone(), "Domain is none"); if (domain.isAll()) { @@ -529,8 +532,8 @@ public class KuduClientSession KuduPredicate predicate = createInListPredicate(columnSchema, discreteValues); builder.addPredicate(predicate); } - else if (valueSet instanceof SortedRangeSet) { - Ranges ranges = ((SortedRangeSet) valueSet).getRanges(); + else if (valueSet instanceof SortedRangeSet sortedRangeSet) { + Ranges ranges = sortedRangeSet.getRanges(); List<Range> rangeList = ranges.getOrderedRanges(); if (rangeList.stream().allMatch(Range::isSingleValue)) { io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema); @@ -577,35 +580,39 @@ public class KuduClientSession { io.trino.spi.type.Type type = TypeHelper.fromKuduColumn(columnSchema); Object javaValue = TypeHelper.getJavaValue(type, value); - if (javaValue instanceof Long) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Long) javaValue); + if (javaValue instanceof Long longValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, longValue); } - if (javaValue instanceof BigDecimal) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (BigDecimal) javaValue); + if (javaValue instanceof BigDecimal bigDecimal) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, bigDecimal); } - if (javaValue instanceof Integer) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Integer) javaValue); + if (javaValue instanceof Integer integerValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, integerValue); } - if (javaValue instanceof Short) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Short) javaValue); + if (javaValue instanceof Short shortValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, shortValue); } - if (javaValue instanceof Byte) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Byte) javaValue); + if (javaValue instanceof Byte byteValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, byteValue); } - if (javaValue instanceof String) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (String) javaValue); + if (javaValue instanceof String stringValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, stringValue); } - if (javaValue instanceof Double) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Double) javaValue); + if (javaValue instanceof Double doubleValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, doubleValue); } - if (javaValue instanceof Float) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Float) javaValue); + if (javaValue instanceof Float floatValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, floatValue); } - if (javaValue instanceof Boolean) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (Boolean) javaValue); + if (javaValue instanceof Boolean booleanValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, booleanValue); } - if (javaValue instanceof byte[]) { - return KuduPredicate.newComparisonPredicate(columnSchema, op, (byte[]) javaValue); + if (javaValue instanceof byte[] byteArrayValue) { + return KuduPredicate.newComparisonPredicate(columnSchema, op, byteArrayValue); + } + if (javaValue instanceof ByteBuffer byteBuffer) { + Slice slice = Slices.wrappedHeapBuffer(byteBuffer); + return KuduPredicate.newComparisonPredicate(columnSchema, op, slice.getBytes(0, slice.length())); } if (javaValue == null) { throw new IllegalStateException("Unexpected null java value for column " + columnSchema.getName()); diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java index 35c1f639c93..64d9cfc06c6 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduColumnHandle.java @@ -13,19 +13,14 @@ */ package io.trino.plugin.kudu; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; -import java.util.Objects; - -import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class KuduColumnHandle +public record KuduColumnHandle(String name, int ordinalPosition, Type type) implements ColumnHandle { public static final String ROW_ID = "row_uuid"; @@ -33,40 +28,13 @@ public class KuduColumnHandle public static final KuduColumnHandle ROW_ID_HANDLE = new KuduColumnHandle(ROW_ID, ROW_ID_POSITION, VarbinaryType.VARBINARY); - private final String name; - private final int ordinalPosition; - private final Type type; - - @JsonCreator - public KuduColumnHandle( - @JsonProperty("name") String name, - @JsonProperty("ordinalPosition") int ordinalPosition, - @JsonProperty("type") Type type) - { - this.name = requireNonNull(name, "name is null"); - this.ordinalPosition = ordinalPosition; - this.type = requireNonNull(type, "type is null"); - } - - @JsonProperty - public String getName() - { - return name; - } - - @JsonProperty - public int getOrdinalPosition() - { - return ordinalPosition; - } - - @JsonProperty - public Type getType() + public KuduColumnHandle { - return type; + requireNonNull(name, "name is null"); + requireNonNull(type, "type is null"); } - public ColumnMetadata getColumnMetadata() + public ColumnMetadata columnMetadata() { return new ColumnMetadata(name, type); } @@ -75,38 +43,4 @@ public class KuduColumnHandle { return name.equals(ROW_ID); } - - @Override - public int hashCode() - { - return Objects.hash( - name, - ordinalPosition, - type); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - KuduColumnHandle other = (KuduColumnHandle) obj; - return Objects.equals(this.name, other.name) && - Objects.equals(this.ordinalPosition, other.ordinalPosition) && - Objects.equals(this.type, other.type); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("name", name) - .add("ordinalPosition", ordinalPosition) - .add("type", type) - .toString(); - } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java index 4e16c1441e3..cb5993bfe78 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduMetadata.java @@ -32,16 +32,17 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.ConnectorTableMetadata; -import io.trino.spi.connector.ConnectorTablePartitioning; import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableVersion; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.LimitApplicationResult; -import io.trino.spi.connector.LocalProperty; import io.trino.spi.connector.NotFoundException; import io.trino.spi.connector.ProjectionApplicationResult; +import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.RowChangeParadigm; +import io.trino.spi.connector.SaveMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.expression.ConnectorExpression; @@ -59,13 +60,16 @@ import org.apache.kudu.client.PartitionSchema.HashBucketSchema; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; +import java.util.Set; import java.util.function.Consumer; +import java.util.function.UnaryOperator; import static com.google.common.base.Strings.emptyToNull; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -73,6 +77,8 @@ import static io.trino.plugin.kudu.KuduColumnHandle.ROW_ID; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS; +import static io.trino.spi.connector.SaveMode.IGNORE; +import static io.trino.spi.connector.SaveMode.REPLACE; import static java.util.Objects.requireNonNull; public class KuduMetadata @@ -99,9 +105,13 @@ public class KuduMetadata } @Override - public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + public Iterator<RelationColumnsMetadata> streamRelationColumns( + ConnectorSession session, + Optional<String> schemaName, + UnaryOperator<Set<SchemaTableName>> relationFilter) { - requireNonNull(prefix, "prefix is null"); + SchemaTablePrefix prefix = schemaName.map(SchemaTablePrefix::new) + .orElseGet(SchemaTablePrefix::new); List<SchemaTableName> tables; if (prefix.getTable().isEmpty()) { @@ -111,15 +121,17 @@ public class KuduMetadata tables = ImmutableList.of(prefix.toSchemaTableName()); } - ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); + Map<SchemaTableName, RelationColumnsMetadata> relationColumns = new HashMap<>(); for (SchemaTableName tableName : tables) { - KuduTableHandle tableHandle = getTableHandle(session, tableName); + KuduTableHandle tableHandle = getTableHandle(session, tableName, Optional.empty(), Optional.empty()); if (tableHandle != null) { - ConnectorTableMetadata tableMetadata = getTableMetadata(tableHandle); - columns.put(tableName, tableMetadata.getColumns()); + KuduTable table = tableHandle.getTable(clientSession); + relationColumns.put(tableName, RelationColumnsMetadata.forTable(tableName, getColumnsMetadata(table.getSchema()))); } } - return columns.buildOrThrow(); + return relationFilter.apply(relationColumns.keySet()).stream() + .map(relationColumns::get) + .iterator(); } private ColumnMetadata getColumnMetadata(ColumnSchema column) @@ -165,13 +177,18 @@ public class KuduMetadata // Kudu returns empty string as a table comment by default Optional<String> tableComment = Optional.ofNullable(emptyToNull(table.getComment())); - List<ColumnMetadata> columnsMetaList = schema.getColumns().stream() + List<ColumnMetadata> columns = getColumnsMetadata(schema); + + Map<String, Object> properties = clientSession.getTableProperties(tableHandle); + return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, properties, tableComment); + } + + private List<ColumnMetadata> getColumnsMetadata(Schema schema) + { + return schema.getColumns().stream() .filter(column -> !column.isKey() || !column.getName().equals(ROW_ID)) .map(this::getColumnMetadata) .collect(toImmutableList()); - - Map<String, Object> properties = clientSession.getTableProperties(tableHandle); - return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columnsMetaList, properties, tableComment); } @Override @@ -180,7 +197,7 @@ public class KuduMetadata KuduTableHandle tableHandle = (KuduTableHandle) connectorTableHandle; ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder(); Schema schema = clientSession.getTableSchema(tableHandle); - forAllColumnHandles(schema, column -> columnHandles.put(column.getName(), column)); + forAllColumnHandles(schema, column -> columnHandles.put(column.name(), column)); return columnHandles.buildOrThrow(); } @@ -206,12 +223,16 @@ public class KuduMetadata .setHidden(true) .build(); } - return kuduColumnHandle.getColumnMetadata(); + return kuduColumnHandle.columnMetadata(); } @Override - public KuduTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + public KuduTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion) { + if (startVersion.isPresent() || endVersion.isPresent()) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables"); + } + try { KuduTable table = clientSession.openTable(schemaTableName); OptionalInt bucketCount = OptionalInt.empty(); @@ -248,12 +269,15 @@ public class KuduMetadata } @Override - public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) { + if (saveMode == REPLACE) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables"); + } if (tableMetadata.getColumns().stream().anyMatch(column -> column.getComment() != null)) { throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with column comment"); } - clientSession.createTable(tableMetadata, ignoreExisting); + clientSession.createTable(tableMetadata, saveMode == IGNORE); } @Override @@ -282,7 +306,7 @@ public class KuduMetadata { KuduTableHandle kuduTableHandle = (KuduTableHandle) tableHandle; KuduColumnHandle kuduColumnHandle = (KuduColumnHandle) column; - clientSession.dropColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.getName()); + clientSession.dropColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.name()); } @Override @@ -290,7 +314,7 @@ public class KuduMetadata { KuduTableHandle kuduTableHandle = (KuduTableHandle) tableHandle; KuduColumnHandle kuduColumnHandle = (KuduColumnHandle) source; - clientSession.renameColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.getName(), target); + clientSession.renameColumn(kuduTableHandle.getSchemaTableName(), kuduColumnHandle.name(), target); } @Override @@ -328,6 +352,7 @@ public class KuduMetadata } @Override + @SuppressWarnings("deprecation") public ConnectorOutputTableHandle beginCreateTable( ConnectorSession session, ConnectorTableMetadata tableMetadata, @@ -432,14 +457,11 @@ public class KuduMetadata { KuduTableHandle handle = (KuduTableHandle) table; - Optional<ConnectorTablePartitioning> tablePartitioning = Optional.empty(); - List<LocalProperty<ColumnHandle>> localProperties = ImmutableList.of(); - return new ConnectorTableProperties( handle.getConstraint(), - tablePartitioning, Optional.empty(), - localProperties); + Optional.empty(), + ImmutableList.of()); } @Override @@ -508,7 +530,7 @@ public class KuduMetadata ImmutableList.Builder<Assignment> assignmentList = ImmutableList.builder(); assignments.forEach((name, column) -> { desiredColumns.add(column); - assignmentList.add(new Assignment(name, column, ((KuduColumnHandle) column).getType())); + assignmentList.add(new Assignment(name, column, ((KuduColumnHandle) column).type())); }); handle = new KuduTableHandle( diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java index 29dc98d338a..8ce926a56a0 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java @@ -16,6 +16,7 @@ package io.trino.plugin.kudu; import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorMergeSink; import io.trino.spi.connector.ConnectorPageSink; @@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -61,6 +63,7 @@ import static io.trino.spi.type.VarbinaryType.VARBINARY; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.kudu.util.DateUtil.epochDaysToSqlDate; public class KuduPageSink implements ConnectorPageSink, ConnectorMergeSink @@ -138,8 +141,8 @@ public class KuduPageSink } return NOT_BLOCKED; } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } @@ -150,6 +153,9 @@ public class KuduPageSink if (block.isNull(position)) { row.setNull(destChannel); } + else if (DATE.equals(type)) { + row.addDate(destChannel, epochDaysToSqlDate(INTEGER.getInt(block, position))); + } else if (TIMESTAMP_MILLIS.equals(type)) { row.addLong(destChannel, truncateEpochMicrosToMillis(TIMESTAMP_MILLIS.getLong(block, position))); } @@ -174,7 +180,8 @@ public class KuduPageSink else if (DOUBLE.equals(type)) { row.addDouble(destChannel, DOUBLE.getDouble(block, position)); } - else if (type instanceof VarcharType varcharType) { + else if (type instanceof VarcharType) { + VarcharType varcharType = (VarcharType) type; Type originalType = originalColumnTypes.get(destChannel); if (DATE.equals(originalType)) { SqlDate date = (SqlDate) originalType.getObjectValue(connectorSession, block, position); @@ -189,7 +196,8 @@ public class KuduPageSink else if (VARBINARY.equals(type)) { row.addBinary(destChannel, VARBINARY.getSlice(block, position).toByteBuffer()); } - else if (type instanceof DecimalType decimalType) { + else if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; SqlDecimal sqlDecimal = (SqlDecimal) decimalType.getObjectValue(connectorSession, block, position); row.addDecimal(destChannel, sqlDecimal.toBigDecimal()); } @@ -228,8 +236,8 @@ public class KuduPageSink try { operationApplier.applyOperationAsync(delete); } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } @@ -248,14 +256,14 @@ public class KuduPageSink try { operationApplier.applyOperationAsync(insert); } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } } } - catch (KuduException e) { - throw new RuntimeException(e); + catch (KuduException | RuntimeException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, e); } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java index cc686497c8c..da0949aa28e 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduRecordSet.java @@ -47,7 +47,7 @@ public class KuduRecordSet public List<Type> getColumnTypes() { return columns.stream() - .map(column -> ((KuduColumnHandle) column).getType()) + .map(column -> ((KuduColumnHandle) column).type()) .collect(toImmutableList()); } @@ -63,7 +63,7 @@ public class KuduRecordSet builder.put(i, ROW_ID_POSITION); } else { - builder.put(i, projectedSchema.getColumnIndex(handle.getName())); + builder.put(i, projectedSchema.getColumnIndex(handle.name())); } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java index 52be6748547..4246ac1f52c 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduSplitManager.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.kudu; +import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; @@ -25,16 +26,15 @@ import io.trino.spi.connector.FixedSplitSource; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static io.trino.plugin.kudu.KuduSessionProperties.getDynamicFilteringWaitTimeout; -import static io.trino.spi.connector.DynamicFilter.NOT_BLOCKED; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; public class KuduSplitManager implements ConnectorSplitManager { + private static final ConnectorSplitSource.ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitSource.ConnectorSplitBatch(ImmutableList.of(), false); private final KuduClientSession clientSession; @Inject @@ -51,82 +51,79 @@ public class KuduSplitManager DynamicFilter dynamicFilter, Constraint constraint) { - long timeoutMillis = getDynamicFilteringWaitTimeout(session).toMillis(); - if (timeoutMillis == 0 || !dynamicFilter.isAwaitable()) { - return getSplitSource(table, dynamicFilter); - } - CompletableFuture<?> dynamicFilterFuture = whenCompleted(dynamicFilter) - .completeOnTimeout(null, timeoutMillis, MILLISECONDS); - CompletableFuture<ConnectorSplitSource> splitSourceFuture = dynamicFilterFuture.thenApply( - ignored -> getSplitSource(table, dynamicFilter)); - return new KuduDynamicFilteringSplitSource(dynamicFilterFuture, splitSourceFuture); - } - - private ConnectorSplitSource getSplitSource( - ConnectorTableHandle table, - DynamicFilter dynamicFilter) - { - KuduTableHandle handle = (KuduTableHandle) table; - - List<KuduSplit> splits = clientSession.buildKuduSplits(handle, dynamicFilter); - - return new FixedSplitSource(splits); - } - - private static CompletableFuture<?> whenCompleted(DynamicFilter dynamicFilter) - { - if (dynamicFilter.isAwaitable()) { - return dynamicFilter.isBlocked().thenCompose(ignored -> whenCompleted(dynamicFilter)); - } - return NOT_BLOCKED; + return new KuduDynamicFilteringSplitSource(session, clientSession, dynamicFilter, table); } private static class KuduDynamicFilteringSplitSource implements ConnectorSplitSource { - private final CompletableFuture<?> dynamicFilterFuture; - private final CompletableFuture<ConnectorSplitSource> splitSourceFuture; + private final KuduClientSession clientSession; + private final DynamicFilter dynamicFilter; + private final ConnectorTableHandle tableHandle; + private final long dynamicFilteringTimeoutNanos; + private ConnectorSplitSource delegateSplitSource; + private final long startNanos; private KuduDynamicFilteringSplitSource( - CompletableFuture<?> dynamicFilterFuture, - CompletableFuture<ConnectorSplitSource> splitSourceFuture) + ConnectorSession connectorSession, + KuduClientSession clientSession, + DynamicFilter dynamicFilter, + ConnectorTableHandle tableHandle) { - this.dynamicFilterFuture = requireNonNull(dynamicFilterFuture, "dynamicFilterFuture is null"); - this.splitSourceFuture = requireNonNull(splitSourceFuture, "splitSourceFuture is null"); + this.clientSession = requireNonNull(clientSession, "clientSession is null"); + this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilterFuture is null"); + this.tableHandle = requireNonNull(tableHandle, "splitSourceFuture is null"); + this.dynamicFilteringTimeoutNanos = (long) getDynamicFilteringWaitTimeout(connectorSession).getValue(NANOSECONDS); + this.startNanos = System.nanoTime(); } @Override public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize) { - return splitSourceFuture.thenCompose(splitSource -> splitSource.getNextBatch(maxSize)); + CompletableFuture<?> blocked = dynamicFilter.isBlocked(); + long remainingTimeoutNanos = getRemainingTimeoutNanos(); + if (remainingTimeoutNanos > 0 && dynamicFilter.isAwaitable()) { + // wait for dynamic filter and yield + return blocked + .thenApply(x -> EMPTY_BATCH) + .completeOnTimeout(EMPTY_BATCH, remainingTimeoutNanos, NANOSECONDS); + } + + if (delegateSplitSource == null) { + KuduTableHandle handle = (KuduTableHandle) tableHandle; + + List<KuduSplit> splits = clientSession.buildKuduSplits(handle, dynamicFilter); + delegateSplitSource = new FixedSplitSource(splits); + } + + return delegateSplitSource.getNextBatch(maxSize); } @Override public void close() { - if (!dynamicFilterFuture.cancel(true)) { - splitSourceFuture.thenAccept(ConnectorSplitSource::close); + if (delegateSplitSource != null) { + delegateSplitSource.close(); } } @Override public boolean isFinished() { - if (!splitSourceFuture.isDone()) { - return false; - } - if (splitSourceFuture.isCompletedExceptionally()) { + if (getRemainingTimeoutNanos() > 0 && dynamicFilter.isAwaitable()) { return false; } - try { - return splitSourceFuture.get().isFinished(); - } - catch (InterruptedException | ExecutionException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException(e); + + if (delegateSplitSource != null) { + return delegateSplitSource.isFinished(); } + + return false; + } + + private long getRemainingTimeoutNanos() + { + return dynamicFilteringTimeoutNanos - (System.nanoTime() - startNanos); } } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java index cc63f844849..aaaa738c94b 100755 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduTableHandle.java @@ -146,7 +146,7 @@ public class KuduTableHandle return Objects.equals(this.schemaTableName, other.schemaTableName) && Objects.equals(this.constraint, other.constraint) && Objects.equals(this.desiredColumns, other.desiredColumns) && - Objects.equals(this.requiresRowId, other.requiresRowId) && + this.requiresRowId == other.requiresRowId && Objects.equals(this.bucketCount, other.bucketCount) && Objects.equals(this.limit, other.limit); } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java index 5fdd31d30ab..3c294029659 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/TypeHelper.java @@ -50,23 +50,20 @@ public final class TypeHelper public static org.apache.kudu.Type toKuduClientType(Type type) { - if (type instanceof VarcharType) { - return org.apache.kudu.Type.STRING; + if (type == BooleanType.BOOLEAN) { + return org.apache.kudu.Type.BOOL; } - if (type.equals(TIMESTAMP_MILLIS)) { - return org.apache.kudu.Type.UNIXTIME_MICROS; + if (type == TinyintType.TINYINT) { + return org.apache.kudu.Type.INT8; } - if (type == BigintType.BIGINT) { - return org.apache.kudu.Type.INT64; + if (type == SmallintType.SMALLINT) { + return org.apache.kudu.Type.INT16; } if (type == IntegerType.INTEGER) { return org.apache.kudu.Type.INT32; } - if (type == SmallintType.SMALLINT) { - return org.apache.kudu.Type.INT16; - } - if (type == TinyintType.TINYINT) { - return org.apache.kudu.Type.INT8; + if (type == BigintType.BIGINT) { + return org.apache.kudu.Type.INT64; } if (type == RealType.REAL) { return org.apache.kudu.Type.FLOAT; @@ -74,21 +71,24 @@ public final class TypeHelper if (type == DoubleType.DOUBLE) { return org.apache.kudu.Type.DOUBLE; } - if (type == BooleanType.BOOLEAN) { - return org.apache.kudu.Type.BOOL; - } - if (type instanceof VarbinaryType) { - return org.apache.kudu.Type.BINARY; - } if (type instanceof DecimalType) { return org.apache.kudu.Type.DECIMAL; } - if (type == DateType.DATE) { + if (type instanceof CharType) { return org.apache.kudu.Type.STRING; } - if (type instanceof CharType) { + if (type instanceof VarcharType) { return org.apache.kudu.Type.STRING; } + if (type instanceof VarbinaryType) { + return org.apache.kudu.Type.BINARY; + } + if (type == DateType.DATE) { + return org.apache.kudu.Type.DATE; + } + if (type.equals(TIMESTAMP_MILLIS)) { + return org.apache.kudu.Type.UNIXTIME_MICROS; + } throw new TrinoException(NOT_SUPPORTED, "Unsupported type: " + type); } @@ -100,31 +100,32 @@ public final class TypeHelper private static Type fromKuduClientType(org.apache.kudu.Type ktype, ColumnTypeAttributes attributes) { switch (ktype) { - case STRING: - return VarcharType.VARCHAR; - case UNIXTIME_MICROS: - return TIMESTAMP_MILLIS; - case INT64: - return BigintType.BIGINT; - case INT32: - return IntegerType.INTEGER; - case INT16: - return SmallintType.SMALLINT; + case BOOL: + return BooleanType.BOOLEAN; case INT8: return TinyintType.TINYINT; + case INT16: + return SmallintType.SMALLINT; + case INT32: + return IntegerType.INTEGER; + case INT64: + return BigintType.BIGINT; case FLOAT: return RealType.REAL; case DOUBLE: return DoubleType.DOUBLE; - case BOOL: - return BooleanType.BOOLEAN; - case BINARY: - return VarbinaryType.VARBINARY; case DECIMAL: return DecimalType.createDecimalType(attributes.getPrecision(), attributes.getScale()); - // TODO: add support for varchar and date types: https://github.com/trinodb/trino/issues/11009 - case VARCHAR: + case STRING: + return VarcharType.VARCHAR; + case BINARY: + return VarbinaryType.VARBINARY; case DATE: + return DateType.DATE; + case UNIXTIME_MICROS: + return TIMESTAMP_MILLIS; + // TODO: add support for varchar types: https://github.com/trinodb/trino/issues/11009 + case VARCHAR: break; } throw new IllegalStateException("Kudu type not implemented for " + ktype); @@ -132,114 +133,90 @@ public final class TypeHelper public static Object getJavaValue(Type type, Object nativeValue) { - if (type instanceof VarcharType) { - return ((Slice) nativeValue).toStringUtf8(); - } - if (type.equals(TIMESTAMP_MILLIS)) { - // Kudu's native format is in microseconds - return nativeValue; - } - if (type == BigintType.BIGINT) { + if (type == BooleanType.BOOLEAN) { return nativeValue; } - if (type == IntegerType.INTEGER) { - return ((Long) nativeValue).intValue(); + if (type == TinyintType.TINYINT) { + return ((Long) nativeValue).byteValue(); } if (type == SmallintType.SMALLINT) { return ((Long) nativeValue).shortValue(); } - if (type == TinyintType.TINYINT) { - return ((Long) nativeValue).byteValue(); + if (type == IntegerType.INTEGER) { + return ((Long) nativeValue).intValue(); } - if (type == DoubleType.DOUBLE) { + if (type == BigintType.BIGINT) { return nativeValue; } if (type == RealType.REAL) { // conversion can result in precision lost return intBitsToFloat(((Long) nativeValue).intValue()); } - if (type == BooleanType.BOOLEAN) { + if (type == DoubleType.DOUBLE) { return nativeValue; } - if (type instanceof VarbinaryType) { - return ((Slice) nativeValue).toByteBuffer(); - } - if (type instanceof DecimalType decimalType) { + if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; if (decimalType.isShort()) { return new BigDecimal(BigInteger.valueOf((long) nativeValue), decimalType.getScale()); } return new BigDecimal(((Int128) nativeValue).toBigInteger(), decimalType.getScale()); } - throw new IllegalStateException("Back conversion not implemented for " + type); - } - - public static Object getObject(Type type, RowResult row, int field) - { - if (row.isNull(field)) { - return null; - } if (type instanceof VarcharType) { - return row.getString(field); - } - if (type.equals(TIMESTAMP_MILLIS)) { - return truncateEpochMicrosToMillis(row.getLong(field)); - } - if (type == BigintType.BIGINT) { - return row.getLong(field); - } - if (type == IntegerType.INTEGER) { - return row.getInt(field); - } - if (type == SmallintType.SMALLINT) { - return row.getShort(field); - } - if (type == TinyintType.TINYINT) { - return row.getByte(field); - } - if (type == DoubleType.DOUBLE) { - return row.getDouble(field); + return ((Slice) nativeValue).toStringUtf8(); } - if (type == RealType.REAL) { - return row.getFloat(field); + if (type instanceof VarbinaryType) { + return ((Slice) nativeValue).toByteBuffer(); } - if (type == BooleanType.BOOLEAN) { - return row.getBoolean(field); + if (type.equals(DateType.DATE)) { + return nativeValue; } - if (type instanceof VarbinaryType) { - return Slices.wrappedHeapBuffer(row.getBinary(field)); + if (type.equals(TIMESTAMP_MILLIS)) { + // Kudu's native format is in microseconds + return nativeValue; } + throw new IllegalStateException("Back conversion not implemented for " + type); + } + + public static Object getObject(Type type, RowResult row, int field) + { if (type instanceof DecimalType) { - return Decimals.encodeScaledValue(row.getDecimal(field), ((DecimalType) type).getScale()); + DecimalType decimalType = (DecimalType) type; + return Decimals.encodeScaledValue(row.getDecimal(field), decimalType.getScale()); } throw new IllegalStateException("getObject not implemented for " + type); } public static long getLong(Type type, RowResult row, int field) { - if (type.equals(TIMESTAMP_MILLIS)) { - return truncateEpochMicrosToMillis(row.getLong(field)); + if (type == TinyintType.TINYINT) { + return row.getByte(field); } - if (type == BigintType.BIGINT) { - return row.getLong(field); + if (type == SmallintType.SMALLINT) { + return row.getShort(field); } if (type == IntegerType.INTEGER) { return row.getInt(field); } - if (type == SmallintType.SMALLINT) { - return row.getShort(field); - } - if (type == TinyintType.TINYINT) { - return row.getByte(field); + if (type == BigintType.BIGINT) { + return row.getLong(field); } if (type == RealType.REAL) { return floatToRawIntBits(row.getFloat(field)); } - if (type instanceof DecimalType decimalType) { + if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; if (decimalType.isShort()) { return row.getDecimal(field).unscaledValue().longValue(); } throw new IllegalStateException("getLong not supported for long decimal: " + type); } + if (type.equals(DateType.DATE)) { + return row.getInt(field); + } + if (type.equals(TIMESTAMP_MILLIS)) { + return truncateEpochMicrosToMillis(row.getLong(field)); + } throw new IllegalStateException("getLong not implemented for " + type); } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java index cdbc1bfc8ae..3eac4e128b8 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/HashPartitionDefinition.java @@ -15,28 +15,4 @@ package io.trino.plugin.kudu.properties; import java.util.List; -public class HashPartitionDefinition -{ - private List<String> columns; - private int buckets; - - public List<String> getColumns() - { - return columns; - } - - public void setColumns(List<String> columns) - { - this.columns = columns; - } - - public int getBuckets() - { - return buckets; - } - - public void setBuckets(int buckets) - { - this.buckets = buckets; - } -} +public record HashPartitionDefinition(List<String> columns, int buckets) {} diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java index 48c3bead663..909a3f7cc86 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/KuduTableProperties.java @@ -190,9 +190,7 @@ public final class KuduTableProperties @SuppressWarnings("unchecked") List<String> rangeColumns = (List<String>) tableProperties.get(PARTITION_BY_RANGE_COLUMNS); if (!rangeColumns.isEmpty()) { - RangePartitionDefinition range = new RangePartitionDefinition(); - range.setColumns(rangeColumns); - design.setRange(range); + design.setRange(new RangePartitionDefinition(rangeColumns)); } return design; @@ -234,10 +232,7 @@ public final class KuduTableProperties if (hashBuckets == null) { throw new TrinoException(GENERIC_USER_ERROR, "Missing table property " + bucketPropertyName); } - HashPartitionDefinition definition = new HashPartitionDefinition(); - definition.setColumns(columns); - definition.setBuckets(hashBuckets); - return definition; + return new HashPartitionDefinition(columns, hashBuckets); } public static List<RangePartition> getRangePartitions(Map<String, Object> tableProperties) @@ -285,17 +280,17 @@ public final class KuduTableProperties if (partitionDesign.getHash() != null) { List<HashPartitionDefinition> list = partitionDesign.getHash(); if (!list.isEmpty()) { - properties.put(PARTITION_BY_HASH_COLUMNS, list.get(0).getColumns()); - properties.put(PARTITION_BY_HASH_BUCKETS, list.get(0).getBuckets()); + properties.put(PARTITION_BY_HASH_COLUMNS, list.get(0).columns()); + properties.put(PARTITION_BY_HASH_BUCKETS, list.get(0).buckets()); } if (list.size() >= 2) { - properties.put(PARTITION_BY_HASH_COLUMNS_2, list.get(1).getColumns()); - properties.put(PARTITION_BY_HASH_BUCKETS_2, list.get(1).getBuckets()); + properties.put(PARTITION_BY_HASH_COLUMNS_2, list.get(1).columns()); + properties.put(PARTITION_BY_HASH_BUCKETS_2, list.get(1).buckets()); } } if (partitionDesign.getRange() != null) { - properties.put(PARTITION_BY_RANGE_COLUMNS, partitionDesign.getRange().getColumns()); + properties.put(PARTITION_BY_RANGE_COLUMNS, partitionDesign.getRange().columns()); } String partitionRangesValue = mapper.writeValueAsString(rangePartitionList); @@ -400,22 +395,17 @@ public final class KuduTableProperties List<HashPartitionDefinition> hashPartitions = partitionSchema.getHashBucketSchemas().stream() .map(hashBucketSchema -> { - HashPartitionDefinition hash = new HashPartitionDefinition(); List<String> cols = hashBucketSchema.getColumnIds().stream() .map(idx -> schema.getColumnByIndex(idx).getName()).collect(toImmutableList()); - hash.setColumns(cols); - hash.setBuckets(hashBucketSchema.getNumBuckets()); - return hash; + return new HashPartitionDefinition(cols, hashBucketSchema.getNumBuckets()); }).collect(toImmutableList()); partitionDesign.setHash(hashPartitions); List<Integer> rangeColumns = partitionSchema.getRangeSchema().getColumnIds(); if (!rangeColumns.isEmpty()) { - RangePartitionDefinition definition = new RangePartitionDefinition(); - definition.setColumns(rangeColumns.stream() + partitionDesign.setRange(new RangePartitionDefinition(rangeColumns.stream() .map(i -> schema.getColumns().get(i).getName()) - .collect(toImmutableList())); - partitionDesign.setRange(definition); + .collect(toImmutableList()))); } return partitionDesign; @@ -426,7 +416,7 @@ public final class KuduTableProperties { PartialRow partialRow = new PartialRow(schema); if (boundValue != null) { - List<Integer> rangeColumns = definition.getColumns().stream() + List<Integer> rangeColumns = definition.columns().stream() .map(schema::getColumnIndex).collect(toImmutableList()); if (rangeColumns.size() != boundValue.getValues().size()) { @@ -555,91 +545,53 @@ public final class KuduTableProperties public static ColumnSchema.CompressionAlgorithm lookupCompression(String compression) { - switch (compression.toLowerCase(Locale.ENGLISH)) { - case "default": - case "default_compression": - return ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION; - case "no": - case "no_compression": - return ColumnSchema.CompressionAlgorithm.NO_COMPRESSION; - case "lz4": - return ColumnSchema.CompressionAlgorithm.LZ4; - case "snappy": - return ColumnSchema.CompressionAlgorithm.SNAPPY; - case "zlib": - return ColumnSchema.CompressionAlgorithm.ZLIB; - default: - throw new IllegalArgumentException(); - } + return switch (compression.toLowerCase(Locale.ENGLISH)) { + case "default", "default_compression" -> ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION; + case "no", "no_compression" -> ColumnSchema.CompressionAlgorithm.NO_COMPRESSION; + case "lz4" -> ColumnSchema.CompressionAlgorithm.LZ4; + case "snappy" -> ColumnSchema.CompressionAlgorithm.SNAPPY; + case "zlib" -> ColumnSchema.CompressionAlgorithm.ZLIB; + default -> throw new IllegalArgumentException(); + }; } public static String lookupCompressionString(ColumnSchema.CompressionAlgorithm algorithm) { - switch (algorithm) { - case DEFAULT_COMPRESSION: - return "default"; - case NO_COMPRESSION: - return "no"; - case LZ4: - return "lz4"; - case SNAPPY: - return "snappy"; - case ZLIB: - return "zlib"; - default: - return "unknown"; - } + return switch (algorithm) { + case DEFAULT_COMPRESSION -> "default"; + case NO_COMPRESSION -> "no"; + case LZ4 -> "lz4"; + case SNAPPY -> "snappy"; + case ZLIB -> "zlib"; + default -> "unknown"; + }; } public static ColumnSchema.Encoding lookupEncoding(String encoding) { - switch (encoding.toLowerCase(Locale.ENGLISH)) { - case "auto": - case "auto_encoding": - return ColumnSchema.Encoding.AUTO_ENCODING; - case "bitshuffle": - case "bit_shuffle": - return ColumnSchema.Encoding.BIT_SHUFFLE; - case "dictionary": - case "dict_encoding": - return ColumnSchema.Encoding.DICT_ENCODING; - case "plain": - case "plain_encoding": - return ColumnSchema.Encoding.PLAIN_ENCODING; - case "prefix": - case "prefix_encoding": - return ColumnSchema.Encoding.PREFIX_ENCODING; - case "runlength": - case "run_length": - case "run length": - case "rle": - return ColumnSchema.Encoding.RLE; - case "group_varint": - return ColumnSchema.Encoding.GROUP_VARINT; - default: - throw new IllegalArgumentException(); - } + return switch (encoding.toLowerCase(Locale.ENGLISH)) { + case "auto", "auto_encoding" -> ColumnSchema.Encoding.AUTO_ENCODING; + case "bitshuffle", "bit_shuffle" -> ColumnSchema.Encoding.BIT_SHUFFLE; + case "dictionary", "dict_encoding" -> ColumnSchema.Encoding.DICT_ENCODING; + case "plain", "plain_encoding" -> ColumnSchema.Encoding.PLAIN_ENCODING; + case "prefix", "prefix_encoding" -> ColumnSchema.Encoding.PREFIX_ENCODING; + case "runlength", "run_length", "run length", "rle" -> ColumnSchema.Encoding.RLE; + case "group_varint" -> ColumnSchema.Encoding.GROUP_VARINT; + default -> throw new IllegalArgumentException(); + }; } public static String lookupEncodingString(ColumnSchema.Encoding encoding) { - switch (encoding) { - case AUTO_ENCODING: - return "auto"; - case BIT_SHUFFLE: - return "bitshuffle"; - case DICT_ENCODING: - return "dictionary"; - case PLAIN_ENCODING: - return "plain"; - case PREFIX_ENCODING: - return "prefix"; - case RLE: - return "runlength"; - case GROUP_VARINT: - return "group_varint"; - default: - return "unknown"; - } + return switch (encoding) { + case AUTO_ENCODING -> "auto"; + case BIT_SHUFFLE -> "bitshuffle"; + case DICT_ENCODING -> "dictionary"; + case PLAIN_ENCODING -> "plain"; + case PREFIX_ENCODING -> "prefix"; + case RLE -> "runlength"; + case GROUP_VARINT -> "group_varint"; + default -> "unknown"; + }; } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java index eb06436a144..90d556dd2e4 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/PartitionDesign.java @@ -42,7 +42,7 @@ public class PartitionDesign public boolean hasPartitions() { - return hash != null && !hash.isEmpty() && !hash.get(0).getColumns().isEmpty() - || range != null && !range.getColumns().isEmpty(); + return hash != null && !hash.isEmpty() && !hash.get(0).columns().isEmpty() + || range != null && !range.columns().isEmpty(); } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java index b1c484b3012..f9dcad16dec 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangeBoundValueSerializer.java @@ -36,7 +36,7 @@ public class RangeBoundValueSerializer writeValue(value.getValues().get(0), gen); } else { - gen.writeStartArray(value.getValues().size()); + gen.writeStartArray(); for (Object obj : value.getValues()) { writeValue(obj, gen); } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java index d8f5919c05d..f1188e09ca3 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartition.java @@ -13,30 +13,6 @@ */ package io.trino.plugin.kudu.properties; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.annotation.Nullable; -public class RangePartition -{ - private final RangeBoundValue lower; - private final RangeBoundValue upper; - - @JsonCreator - public RangePartition( - @JsonProperty("lower") RangeBoundValue lower, - @JsonProperty("upper") RangeBoundValue upper) - { - this.lower = lower; - this.upper = upper; - } - - public RangeBoundValue getLower() - { - return lower; - } - - public RangeBoundValue getUpper() - { - return upper; - } -} +public record RangePartition(@Nullable RangeBoundValue lower, @Nullable RangeBoundValue upper) {} diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java index 1fdf49aabf7..1fed3bc203b 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/properties/RangePartitionDefinition.java @@ -13,19 +13,16 @@ */ package io.trino.plugin.kudu.properties; -import java.util.List; +import com.google.common.collect.ImmutableList; -public class RangePartitionDefinition -{ - private List<String> columns; +import java.util.List; - public List<String> getColumns() - { - return columns; - } +import static java.util.Objects.requireNonNull; - public void setColumns(List<String> columns) +public record RangePartitionDefinition(List<String> columns) +{ + public RangePartitionDefinition { - this.columns = columns; + columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java index ad3c828445d..e1647995846 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/NoSchemaEmulation.java @@ -23,6 +23,8 @@ import java.util.List; import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS; +import static java.lang.String.format; public class NoSchemaEmulation implements SchemaEmulation @@ -31,7 +33,7 @@ public class NoSchemaEmulation public void createSchema(KuduClientWrapper client, String schemaName) { if (DEFAULT_SCHEMA.equals(schemaName)) { - throw new SchemaAlreadyExistsException(schemaName); + throw new TrinoException(SCHEMA_ALREADY_EXISTS, format("Schema already exists: '%s'", schemaName)); } throw new TrinoException(GENERIC_USER_ERROR, "Creating schema in Kudu connector not allowed if schema emulation is disabled."); } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java deleted file mode 100644 index 64c9ff9ac45..00000000000 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaAlreadyExistsException.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.kudu.schema; - -import io.trino.spi.TrinoException; - -import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; -import static java.lang.String.format; - -public class SchemaAlreadyExistsException - extends TrinoException -{ - private final String schemaName; - - public SchemaAlreadyExistsException(String schemaName) - { - this(schemaName, format("Schema already exists: '%s'", schemaName)); - } - - public SchemaAlreadyExistsException(String schemaName, String message) - { - super(ALREADY_EXISTS, message); - this.schemaName = schemaName; - } - - public String getSchemaName() - { - return schemaName; - } -} diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java index 076001d11ee..c66f08b3a4f 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java @@ -39,7 +39,9 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.StandardErrorCode.SCHEMA_ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; +import static java.lang.String.format; public class SchemaEmulationByTableNameConvention implements SchemaEmulation @@ -58,7 +60,7 @@ public class SchemaEmulationByTableNameConvention public void createSchema(KuduClientWrapper client, String schemaName) { if (DEFAULT_SCHEMA.equals(schemaName)) { - throw new SchemaAlreadyExistsException(schemaName); + throw new TrinoException(SCHEMA_ALREADY_EXISTS, format("Schema already exists: '%s'", schemaName)); } try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) { KuduTable schemasTable = getSchemasTable(client); diff --git a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java index 2573db30616..9a111f38334 100644 --- a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java +++ b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KeyEncoderAccessor.java @@ -33,11 +33,6 @@ public final class KeyEncoderAccessor return KeyEncoder.decodePrimaryKey(schema, key); } - public static byte[] encodeRangePartitionKey(PartialRow row, PartitionSchema.RangeSchema rangeSchema) - { - return KeyEncoder.encodeRangePartitionKey(row, rangeSchema); - } - public static PartialRow decodeRangePartitionKey(Schema schema, PartitionSchema partitionSchema, byte[] key) { return KeyEncoder.decodeRangePartitionKey(schema, partitionSchema, key); diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java index 847a818e3ca..999b4d50642 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/BaseKuduConnectorSmokeTest.java @@ -20,7 +20,6 @@ import org.junit.jupiter.api.Test; import java.util.Optional; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch; import static io.trino.plugin.kudu.TestKuduConnectorTest.REGION_COLUMNS; import static io.trino.plugin.kudu.TestKuduConnectorTest.createKuduTableForWrites; import static io.trino.plugin.kudu.TestingKuduServer.EARLIEST_TAG; @@ -40,9 +39,10 @@ public abstract class BaseKuduConnectorSmokeTest protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunnerTpch( - closeAfterClass(new TestingKuduServer(getKuduServerVersion())), - getKuduSchemaEmulationPrefix(), REQUIRED_TPCH_TABLES); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer(getKuduServerVersion()))) + .setKuduSchemaEmulationPrefix(getKuduSchemaEmulationPrefix()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); } @Override @@ -50,16 +50,16 @@ public abstract class BaseKuduConnectorSmokeTest { return switch (connectorBehavior) { case SUPPORTS_ARRAY, - SUPPORTS_COMMENT_ON_COLUMN, - SUPPORTS_COMMENT_ON_TABLE, - SUPPORTS_CREATE_MATERIALIZED_VIEW, - SUPPORTS_CREATE_VIEW, - SUPPORTS_NEGATIVE_DATE, - SUPPORTS_NOT_NULL_CONSTRAINT, - SUPPORTS_RENAME_SCHEMA, - SUPPORTS_ROW_TYPE, - SUPPORTS_TOPN_PUSHDOWN, - SUPPORTS_TRUNCATE -> false; + SUPPORTS_COMMENT_ON_COLUMN, + SUPPORTS_COMMENT_ON_TABLE, + SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_CREATE_VIEW, + SUPPORTS_NEGATIVE_DATE, + SUPPORTS_NOT_NULL_CONSTRAINT, + SUPPORTS_RENAME_SCHEMA, + SUPPORTS_ROW_TYPE, + SUPPORTS_TOPN_PUSHDOWN, + SUPPORTS_TRUNCATE -> false; default -> super.hasBehavior(connectorBehavior); }; } diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java new file mode 100644 index 00000000000..5645162cd9c --- /dev/null +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduCreateAndInsertDataSetup.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kudu; + +import io.trino.testing.datatype.ColumnSetup; +import io.trino.testing.datatype.CreateAndInsertDataSetup; +import io.trino.testing.sql.SqlExecutor; + +import java.util.List; +import java.util.stream.IntStream; + +import static java.lang.String.format; +import static java.util.stream.Collectors.joining; + +public class KuduCreateAndInsertDataSetup + extends CreateAndInsertDataSetup +{ + public KuduCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix) + { + super(sqlExecutor, tableNamePrefix); + } + + @Override + protected String tableDefinition(List<ColumnSetup> inputs) + { + return IntStream.range(0, inputs.size()) + .mapToObj(column -> { + ColumnSetup input = inputs.get(column); + if (input.getDeclaredType().isEmpty()) { + return format("%s AS col_%d", input.getInputLiteral(), column); + } + + return format("CAST(%s AS %s) AS col_%d", input.getInputLiteral(), input.getDeclaredType().get(), column); + }) + .collect(joining(",\n", "AS\nSELECT\n", "\nWHERE 'with no' = 'data'")); + } +} diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java index 9494b78bca0..d83ec457ec5 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduQueryRunnerFactory.java @@ -14,157 +14,119 @@ package io.trino.plugin.kudu; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.net.HostAndPort; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.airlift.log.Logger; import io.airlift.log.Logging; -import io.trino.Session; +import io.trino.plugin.base.util.Closables; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; -import static io.airlift.testing.Closeables.closeAllSuppress; -import static io.trino.Session.SessionBuilder; import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.Objects.requireNonNull; public final class KuduQueryRunnerFactory { private KuduQueryRunnerFactory() {} - public static QueryRunner createKuduQueryRunner(TestingKuduServer kuduServer, Session session) - throws Exception + public static Builder builder(TestingKuduServer kuduServer) { - QueryRunner runner = null; - try { - runner = DistributedQueryRunner.builder(session).build(); - - installKuduConnector(kuduServer.getMasterAddress(), runner, session.getSchema().orElse("kudu_smoke_test"), Optional.of("")); - - return runner; - } - catch (Throwable e) { - closeAllSuppress(e, runner); - throw e; - } + return new Builder(kuduServer); } - public static QueryRunner createKuduQueryRunner(TestingKuduServer kuduServer, String kuduSchema) - throws Exception + public static class Builder + extends DistributedQueryRunner.Builder<Builder> { - QueryRunner runner = null; - try { - runner = DistributedQueryRunner.builder(createSession(kuduSchema)).build(); - - installKuduConnector(kuduServer.getMasterAddress(), runner, kuduSchema, Optional.of("")); - - return runner; + private final TestingKuduServer kuduServer; + private final Map<String, String> connectorProperties = new HashMap<>(); + private Optional<String> kuduSchemaEmulationPrefix = Optional.empty(); + private List<TpchTable<?>> initialTables = ImmutableList.of(); + + private Builder(TestingKuduServer kuduServer) + { + super(testSessionBuilder() + .setCatalog("kudu") + .setSchema("default") + .build()); + this.kuduServer = requireNonNull(kuduServer, "kuduServer is null"); } - catch (Throwable e) { - closeAllSuppress(e, runner); - throw e; - } - } - - public static QueryRunner createKuduQueryRunnerTpch(TestingKuduServer kuduServer, Optional<String> kuduSchemaEmulationPrefix, TpchTable<?>... tables) - throws Exception - { - return createKuduQueryRunnerTpch(kuduServer, kuduSchemaEmulationPrefix, ImmutableList.copyOf(tables)); - } - - public static QueryRunner createKuduQueryRunnerTpch(TestingKuduServer kuduServer, Optional<String> kuduSchemaEmulationPrefix, Iterable<TpchTable<?>> tables) - throws Exception - { - return createKuduQueryRunnerTpch(kuduServer, kuduSchemaEmulationPrefix, ImmutableMap.of(), ImmutableMap.of(), tables); - } - - public static QueryRunner createKuduQueryRunnerTpch( - TestingKuduServer kuduServer, - Optional<String> kuduSchemaEmulationPrefix, - Map<String, String> kuduSessionProperties, - Map<String, String> extraProperties, - Iterable<TpchTable<?>> tables) - throws Exception - { - DistributedQueryRunner runner = null; - try { - String kuduSchema = kuduSchemaEmulationPrefix.isPresent() ? "tpch" : "default"; - Session session = createSession(kuduSchema, kuduSessionProperties); - runner = DistributedQueryRunner.builder(session) - .setExtraProperties(extraProperties) - .build(); - - runner.installPlugin(new TpchPlugin()); - runner.createCatalog("tpch", "tpch"); - - installKuduConnector(kuduServer.getMasterAddress(), runner, kuduSchema, kuduSchemaEmulationPrefix); - - copyTpchTables(runner, "tpch", TINY_SCHEMA_NAME, session, tables); - return runner; + @CanIgnoreReturnValue + public Builder setKuduSchemaEmulationPrefix(Optional<String> kuduSchemaEmulationPrefix) + { + this.kuduSchemaEmulationPrefix = requireNonNull(kuduSchemaEmulationPrefix, "kuduSchemaEmulationPrefix is null"); + return this; } - catch (Throwable e) { - closeAllSuppress(e, runner); - throw e; - } - } - private static void installKuduConnector(HostAndPort masterAddress, QueryRunner runner, String kuduSchema, Optional<String> kuduSchemaEmulationPrefix) - { - Map<String, String> properties; - if (kuduSchemaEmulationPrefix.isPresent()) { - properties = ImmutableMap.of( - "kudu.schema-emulation.enabled", "true", - "kudu.schema-emulation.prefix", kuduSchemaEmulationPrefix.get(), - "kudu.client.master-addresses", masterAddress.toString()); + @CanIgnoreReturnValue + public Builder addConnectorProperty(String key, String value) + { + this.connectorProperties.put(key, value); + return this; } - else { - properties = ImmutableMap.of( - "kudu.schema-emulation.enabled", "false", - "kudu.client.master-addresses", masterAddress.toString()); - } - - runner.installPlugin(new KuduPlugin()); - runner.createCatalog("kudu", "kudu", properties); - if (kuduSchemaEmulationPrefix.isPresent()) { - runner.execute("DROP SCHEMA IF EXISTS " + kuduSchema); - runner.execute("CREATE SCHEMA " + kuduSchema); + @CanIgnoreReturnValue + public Builder setInitialTables(List<TpchTable<?>> initialTables) + { + this.initialTables = ImmutableList.copyOf(initialTables); + return this; } - } - public static Session createSession(String schema, Map<String, String> kuduSessionProperties) - { - SessionBuilder builder = testSessionBuilder() - .setCatalog("kudu") - .setSchema(schema); - kuduSessionProperties.forEach((k, v) -> builder.setCatalogSessionProperty("kudu", k, v)); - return builder.build(); - } - - public static Session createSession(String schema) - { - return testSessionBuilder() - .setCatalog("kudu") - .setSchema(schema) - .build(); + @Override + public DistributedQueryRunner build() + throws Exception + { + String kuduSchema = kuduSchemaEmulationPrefix.isPresent() ? "tpch" : "default"; + amendSession(sessionBuilder -> sessionBuilder.setSchema(kuduSchema)); + DistributedQueryRunner queryRunner = super.build(); + try { + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + if (kuduSchemaEmulationPrefix.isPresent()) { + addConnectorProperty("kudu.schema-emulation.enabled", "true"); + addConnectorProperty("kudu.schema-emulation.prefix", kuduSchemaEmulationPrefix.get()); + addConnectorProperty("kudu.client.master-addresses", kuduServer.getMasterAddress().toString()); + } + else { + addConnectorProperty("kudu.schema-emulation.enabled", "false"); + addConnectorProperty("kudu.client.master-addresses", kuduServer.getMasterAddress().toString()); + } + + queryRunner.installPlugin(new KuduPlugin()); + queryRunner.createCatalog("kudu", "kudu", connectorProperties); + + if (kuduSchemaEmulationPrefix.isPresent()) { + queryRunner.execute("DROP SCHEMA IF EXISTS " + kuduSchema); + queryRunner.execute("CREATE SCHEMA " + kuduSchema); + } + + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, queryRunner.getDefaultSession(), initialTables); + return queryRunner; + } + catch (Throwable e) { + Closables.closeAllSuppress(e, queryRunner); + throw e; + } + } } public static void main(String[] args) throws Exception { Logging.initialize(); - DistributedQueryRunner queryRunner = (DistributedQueryRunner) createKuduQueryRunnerTpch( - new TestingKuduServer(), - Optional.empty(), - ImmutableMap.of(), - ImmutableMap.of("http-server.http.port", "8080"), - TpchTable.getTables()); + DistributedQueryRunner queryRunner = builder(new TestingKuduServer()) + .addCoordinatorProperty("http-server.http.port", "8080") + .setInitialTables(TpchTable.getTables()) + .build(); + Logger log = Logger.get(KuduQueryRunnerFactory.class); log.info("======== SERVER STARTED ========"); log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java new file mode 100644 index 00000000000..2bcf6aec80e --- /dev/null +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/KuduTabletWaitStrategy.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kudu; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import org.testcontainers.containers.ContainerLaunchException; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public final class KuduTabletWaitStrategy + extends AbstractWaitStrategy +{ + private final GenericContainer<?> master; + + public KuduTabletWaitStrategy(GenericContainer<?> master) + { + this.master = requireNonNull(master, "master is null"); + } + + @Override + protected void waitUntilReady() + { + Failsafe.with(RetryPolicy.builder() + .withMaxDuration(startupTimeout) + .withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration + .abortOn(e -> getExitCode().isPresent()) + .build()) + .run(() -> { + // Note: This condition requires a dependency on org.rnorth.duct-tape:duct-tape + if (!getRateLimiter().getWhenReady(() -> master.getLogs().contains("Registered new tserver with Master"))) { + // We say "timed out" immediately. Failsafe will propagate this only when timeout reached. + throw new ContainerLaunchException("Timed out waiting for container to register tserver"); + } + }); + } + + private Optional<Long> getExitCode() + { + if (waitStrategyTarget.getContainerId() == null) { + // Not yet started + return Optional.empty(); + } + + InspectContainerResponse currentContainerInfo = waitStrategyTarget.getCurrentContainerInfo(); + if (currentContainerInfo.getState().getStartedAt() == null) { + // not yet running + return Optional.empty(); + } + // currentContainerInfo.getState().getExitCode() is present (0) even in "running" state + if (Boolean.TRUE.equals(currentContainerInfo.getState().getRunning())) { + // running + return Optional.empty(); + } + return Optional.ofNullable(currentContainerInfo.getState().getExitCodeLong()); + } +} diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java index 78580ad2e1d..ec75f4a77af 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduClientConfig.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.kudu; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.units.Duration; import org.junit.jupiter.api.Test; @@ -31,7 +32,7 @@ public class TestKuduClientConfig public void testDefaults() { assertRecordedDefaults(recordDefaults(KuduClientConfig.class) - .setMasterAddresses("") + .setMasterAddresses(ImmutableList.of()) .setDefaultAdminOperationTimeout(new Duration(30, SECONDS)) .setDefaultOperationTimeout(new Duration(30, SECONDS)) .setDisableStatistics(false) @@ -45,7 +46,7 @@ public class TestKuduClientConfig public void testExplicitPropertyMappingsWithCredentialsKey() { Map<String, String> properties = ImmutableMap.<String, String>builder() - .put("kudu.client.master-addresses", "localhost") + .put("kudu.client.master-addresses", "localhost,localhost2") .put("kudu.client.default-admin-operation-timeout", "1m") .put("kudu.client.default-operation-timeout", "5m") .put("kudu.client.disable-statistics", "true") @@ -56,7 +57,7 @@ public class TestKuduClientConfig .buildOrThrow(); KuduClientConfig expected = new KuduClientConfig() - .setMasterAddresses("localhost") + .setMasterAddresses(ImmutableList.of("localhost", "localhost2")) .setDefaultAdminOperationTimeout(new Duration(1, MINUTES)) .setDefaultOperationTimeout(new Duration(5, MINUTES)) .setDisableStatistics(true) diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java index 48363c5d2f4..ca85a1a2390 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduConnectorTest.java @@ -29,7 +29,6 @@ import java.util.function.Consumer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -51,10 +50,9 @@ public class TestKuduConnectorTest protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunnerTpch( - closeAfterClass(new TestingKuduServer()), - Optional.empty(), - REQUIRED_TPCH_TABLES); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); } @Override @@ -62,18 +60,18 @@ public class TestKuduConnectorTest { return switch (connectorBehavior) { case SUPPORTS_ARRAY, - SUPPORTS_COMMENT_ON_COLUMN, - SUPPORTS_COMMENT_ON_TABLE, - SUPPORTS_CREATE_MATERIALIZED_VIEW, - SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT, - SUPPORTS_CREATE_VIEW, - SUPPORTS_NEGATIVE_DATE, - SUPPORTS_NOT_NULL_CONSTRAINT, - SUPPORTS_RENAME_SCHEMA, - SUPPORTS_ROW_TYPE, - SUPPORTS_SET_COLUMN_TYPE, - SUPPORTS_TOPN_PUSHDOWN, - SUPPORTS_TRUNCATE -> false; + SUPPORTS_COMMENT_ON_COLUMN, + SUPPORTS_COMMENT_ON_TABLE, + SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT, + SUPPORTS_CREATE_VIEW, + SUPPORTS_NEGATIVE_DATE, + SUPPORTS_NOT_NULL_CONSTRAINT, + SUPPORTS_RENAME_SCHEMA, + SUPPORTS_ROW_TYPE, + SUPPORTS_SET_COLUMN_TYPE, + SUPPORTS_TOPN_PUSHDOWN, + SUPPORTS_TRUNCATE -> false; default -> super.hasBehavior(connectorBehavior); }; } @@ -175,7 +173,7 @@ public class TestKuduConnectorTest .row("custkey", "bigint", extra, "") .row("orderstatus", "varchar", extra, "") .row("totalprice", "double", extra, "") - .row("orderdate", "varchar", extra, "") + .row("orderdate", "date", extra, "") .row("orderpriority", "varchar", extra, "") .row("clerk", "varchar", extra, "") .row("shippriority", "integer", extra, "") @@ -200,7 +198,7 @@ public class TestKuduConnectorTest " custkey bigint COMMENT '' WITH ( nullable = true ),\n" + " orderstatus varchar COMMENT '' WITH ( nullable = true ),\n" + " totalprice double COMMENT '' WITH ( nullable = true ),\n" + - " orderdate varchar COMMENT '' WITH ( nullable = true ),\n" + + " orderdate date COMMENT '' WITH ( nullable = true ),\n" + " orderpriority varchar COMMENT '' WITH ( nullable = true ),\n" + " clerk varchar COMMENT '' WITH ( nullable = true ),\n" + " shippriority integer COMMENT '' WITH ( nullable = true ),\n" + @@ -537,6 +535,28 @@ public class TestKuduConnectorTest } } + @Test + public void testAddColumnWithDecimal() + { + String tableName = "test_add_column_with_decimal" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + "(" + + "id INT WITH (primary_key=true), " + + "a_varchar VARCHAR" + + ") WITH (" + + " partition_by_hash_columns = ARRAY['id'], " + + " partition_by_hash_buckets = 2" + + ")"); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN b_decimal decimal(14,5)"); + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c_decimal decimal(35,5)"); + + assertThat(getColumnType(tableName, "b_decimal")).isEqualTo("decimal(14,5)"); + assertThat(getColumnType(tableName, "c_decimal")).isEqualTo("decimal(35,5)"); + + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testInsertIntoTableHavingRowUuid() { @@ -603,8 +623,6 @@ public class TestKuduConnectorTest public void testInsertNegativeDate() { // TODO Remove this overriding test once kudu connector can create tables with default partitions - // TODO Update this test once kudu connector supports DATE type: https://github.com/trinodb/trino/issues/11009 - // DATE type is not supported by Kudu connector try (TestTable table = new TestTable(getQueryRunner()::execute, "insert_date", "(dt DATE WITH (primary_key=true)) " + "WITH (partition_by_hash_columns = ARRAY['dt'], partition_by_hash_buckets = 2)")) { @@ -615,7 +633,7 @@ public class TestKuduConnectorTest @Override protected String errorMessageForInsertNegativeDate(String date) { - return "Insert query has mismatched column types: Table: \\[varchar\\], Query: \\[date\\]"; + return "Date value <-719893>} is out of range '0001-01-01':'9999-12-31'"; } @Test @@ -738,22 +756,19 @@ public class TestKuduConnectorTest // Map date column type to varchar String tableName = "negative_date_" + randomNameSuffix(); - try { - assertUpdate(format("CREATE TABLE %s AS SELECT DATE '-0001-01-01' AS dt", tableName), 1); - assertQuery("SELECT * FROM " + tableName, "VALUES '-0001-01-01'"); - assertQuery(format("SELECT * FROM %s WHERE dt = '-0001-01-01'", tableName), "VALUES '-0001-01-01'"); - } - finally { - assertUpdate("DROP TABLE IF EXISTS " + tableName); - } + abort("TODO: implement the test for Kudu"); } @Test @Override + @SuppressWarnings("deprecation") public void testDateYearOfEraPredicate() { - assertThatThrownBy(super::testDateYearOfEraPredicate) - .hasStackTraceContaining("Cannot apply operator: varchar = date"); + // Override because the connector throws an exception instead of an empty result when the value is out of supported range + assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE '1997-09-14'", "VALUES DATE '1997-09-14'"); + // TODO Replace failure with a TrinoException + assertThatThrownBy(() -> query("SELECT * FROM orders WHERE orderdate = DATE '-1996-09-14'")) + .hasMessageContaining("integer value out of range for Type: date column: -1448295"); } @Test @@ -1013,13 +1028,17 @@ public class TestKuduConnectorTest return Optional.of(dataMappingTestSetup.asUnsupported()); } - if (typeName.equals("date") // date gets stored as varchar - || typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416) + if (typeName.equals("varbinary") // TODO (https://github.com/trinodb/trino/issues/3416) || (typeName.startsWith("char") && dataMappingTestSetup.getSampleValueLiteral().contains(" "))) { // TODO: https://github.com/trinodb/trino/issues/3597 // TODO this should either work or fail cleanly return Optional.empty(); } + if (typeName.equals("date") && dataMappingTestSetup.getSampleValueLiteral().equals("DATE '1582-10-05'")) { + // Kudu connector returns +10 days during julian->gregorian switch. The test case exists in TestKuduTypeMapping.testDate(). + return Optional.empty(); + } + return Optional.of(dataMappingTestSetup); } @@ -1067,6 +1086,12 @@ public class TestKuduConnectorTest assertThat(e).hasMessageContaining("invalid column name: identifier"); } + @Override + protected String errorMessageForCreateTableAsSelectNegativeDate(String date) + { + return ".*Date value <-719893>} is out of range '0001-01-01':'9999-12-31'.*"; + } + private void assertTableProperty(String tableProperties, String key, String regexValue) { assertThat(Pattern.compile(key + "\\s*=\\s*" + regexValue + ",?\\s+").matcher(tableProperties).find()) diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java index c2420d76be8..87c98224ab1 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDecimalColumns.java @@ -22,7 +22,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.offset; @@ -49,7 +48,7 @@ public class TestKuduIntegrationDecimalColumns protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "decimal"); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build(); } @AfterAll diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java index a6b0c6be87c..480c80645c1 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.kudu; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Ints; import io.opentelemetry.api.trace.Span; @@ -33,7 +32,6 @@ import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.QueryRunner; -import io.trino.tpch.TpchTable; import io.trino.transaction.TransactionId; import io.trino.transaction.TransactionManager; import org.intellij.lang.annotations.Language; @@ -45,15 +43,17 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch; import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST; import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE; +import static io.trino.tpch.TpchTable.LINE_ITEM; +import static io.trino.tpch.TpchTable.ORDERS; +import static io.trino.tpch.TpchTable.PART; +import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; public class TestKuduIntegrationDynamicFilter @@ -63,14 +63,13 @@ public class TestKuduIntegrationDynamicFilter protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunnerTpch( - closeAfterClass(new TestingKuduServer()), - Optional.of(""), - ImmutableMap.of("dynamic_filtering_wait_timeout", "1h"), - ImmutableMap.of( - "dynamic-filtering.small.max-distinct-values-per-driver", "100", - "dynamic-filtering.small.range-row-limit-per-driver", "100"), - TpchTable.getTables()); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())) + .setKuduSchemaEmulationPrefix(Optional.of("")) + .addConnectorProperty("kudu.dynamic-filtering.wait-timeout", "1h") + .addExtraProperty("dynamic-filtering.small.max-distinct-values-per-driver", "100") + .addExtraProperty("dynamic-filtering.small.range-row-limit-per-driver", "100") + .setInitialTables(List.of(LINE_ITEM, ORDERS, PART)) + .build(); } @Test @@ -88,19 +87,32 @@ public class TestKuduIntegrationDynamicFilter QualifiedObjectName tableName = new QualifiedObjectName("kudu", "tpch", "orders"); Optional<TableHandle> tableHandle = runner.getMetadata().getTableHandle(session, tableName); assertThat(tableHandle.isPresent()).isTrue(); - SplitSource splitSource = runner.getSplitManager() - .getSplits(session, Span.getInvalid(), tableHandle.get(), new IncompleteDynamicFilter(), alwaysTrue()); - List<Split> splits = new ArrayList<>(); - while (!splitSource.isFinished()) { - splits.addAll(splitSource.getNextBatch(1000).get().getSplits()); + CompletableFuture<Void> dynamicFilterBlocked = new CompletableFuture<>(); + try { + SplitSource splitSource = runner.getSplitManager() + .getSplits(session, Span.getInvalid(), tableHandle.get(), new BlockedDynamicFilter(dynamicFilterBlocked), alwaysTrue()); + List<Split> splits = new ArrayList<>(); + while (!splitSource.isFinished()) { + splits.addAll(splitSource.getNextBatch(1000).get().getSplits()); + } + splitSource.close(); + assertThat(splits.isEmpty()).isFalse(); + } + finally { + dynamicFilterBlocked.complete(null); } - splitSource.close(); - assertThat(splits.isEmpty()).isFalse(); } - private static class IncompleteDynamicFilter + private static class BlockedDynamicFilter implements DynamicFilter { + private final CompletableFuture<?> isBlocked; + + public BlockedDynamicFilter(CompletableFuture<?> isBlocked) + { + this.isBlocked = requireNonNull(isBlocked, "isBlocked is null"); + } + @Override public Set<ColumnHandle> getColumnsCovered() { @@ -110,14 +122,7 @@ public class TestKuduIntegrationDynamicFilter @Override public CompletableFuture<?> isBlocked() { - return CompletableFuture.runAsync(() -> { - try { - TimeUnit.HOURS.sleep(1); - } - catch (InterruptedException e) { - throw new IllegalStateException(e); - } - }); + return isBlocked; } @Override diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java index 3c0cb1b917a..fde6eeffd8a 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationHashPartitioning.java @@ -19,7 +19,6 @@ import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner; import static org.assertj.core.api.Assertions.assertThat; public class TestKuduIntegrationHashPartitioning @@ -29,7 +28,7 @@ public class TestKuduIntegrationHashPartitioning protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "hash"); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build(); } @Test diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java index c20bddf6564..d87be9f3e72 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationIntegerColumns.java @@ -18,7 +18,6 @@ import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.junit.jupiter.api.Test; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; @@ -36,7 +35,7 @@ public class TestKuduIntegrationIntegerColumns protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "test_integer"); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build(); } @Test diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java index 0e2975b1d0e..6ac167647df 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationRangePartitioning.java @@ -18,7 +18,6 @@ import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import org.junit.jupiter.api.Test; -import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunner; import static java.lang.String.join; import static org.assertj.core.api.Assertions.assertThat; @@ -84,7 +83,7 @@ public class TestKuduIntegrationRangePartitioning protected QueryRunner createQueryRunner() throws Exception { - return createKuduQueryRunner(closeAfterClass(new TestingKuduServer()), "range_partitioning"); + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build(); } @Test diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java new file mode 100644 index 00000000000..57060be2b20 --- /dev/null +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduTypeMapping.java @@ -0,0 +1,347 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.kudu; + +import io.trino.Session; +import io.trino.spi.type.TimeZoneKey; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingSession; +import io.trino.testing.datatype.CreateAsSelectDataSetup; +import io.trino.testing.datatype.DataSetup; +import io.trino.testing.datatype.SqlDataTypeTest; +import io.trino.testing.sql.TrinoSqlExecutor; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.function.Function; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DecimalType.createDecimalType; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimestampType.createTimestampType; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.String.format; +import static java.time.ZoneOffset.UTC; + +final class TestKuduTypeMapping + extends AbstractTestQueryFramework +{ + private final ZoneId jvmZone = ZoneId.systemDefault(); + private final LocalDateTime timeGapInJvmZone1 = LocalDateTime.of(1970, 1, 1, 0, 13, 42); + private final LocalDateTime timeGapInJvmZone2 = LocalDateTime.of(2018, 4, 1, 2, 13, 55, 123_000_000); + private final LocalDateTime timeDoubledInJvmZone = LocalDateTime.of(2018, 10, 28, 1, 33, 17, 456_000_000); + + // no DST in 1970, but has DST in later years (e.g. 2018) + private final ZoneId vilnius = ZoneId.of("Europe/Vilnius"); + private final LocalDateTime timeGapInVilnius = LocalDateTime.of(2018, 3, 25, 3, 17, 17); + private final LocalDateTime timeDoubledInVilnius = LocalDateTime.of(2018, 10, 28, 3, 33, 33, 333_000_000); + + // minutes offset change since 1970-01-01, no DST + private final ZoneId kathmandu = ZoneId.of("Asia/Kathmandu"); + private final LocalDateTime timeGapInKathmandu = LocalDateTime.of(1986, 1, 1, 0, 13, 7); + + @BeforeAll + public void setUp() + { + checkState(jvmZone.getId().equals("America/Bahia_Banderas"), "This test assumes certain JVM time zone"); + LocalDate dateOfLocalTimeChangeForwardAtMidnightInJvmZone = LocalDate.of(1970, 1, 1); + checkIsGap(jvmZone, dateOfLocalTimeChangeForwardAtMidnightInJvmZone.atStartOfDay()); + checkIsGap(jvmZone, timeGapInJvmZone1); + checkIsGap(jvmZone, timeGapInJvmZone2); + checkIsDoubled(jvmZone, timeDoubledInJvmZone); + + LocalDate dateOfLocalTimeChangeForwardAtMidnightInSomeZone = LocalDate.of(1983, 4, 1); + checkIsGap(vilnius, dateOfLocalTimeChangeForwardAtMidnightInSomeZone.atStartOfDay()); + LocalDate dateOfLocalTimeChangeBackwardAtMidnightInSomeZone = LocalDate.of(1983, 10, 1); + checkIsDoubled(vilnius, dateOfLocalTimeChangeBackwardAtMidnightInSomeZone.atStartOfDay().minusMinutes(1)); + checkIsGap(vilnius, timeGapInVilnius); + checkIsDoubled(vilnius, timeDoubledInVilnius); + + checkIsGap(kathmandu, timeGapInKathmandu); + } + + private static void checkIsGap(ZoneId zone, LocalDateTime dateTime) + { + verify(isGap(zone, dateTime), "Expected %s to be a gap in %s", dateTime, zone); + } + + private static boolean isGap(ZoneId zone, LocalDateTime dateTime) + { + return zone.getRules().getValidOffsets(dateTime).isEmpty(); + } + + private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime) + { + verify(zone.getRules().getValidOffsets(dateTime).size() == 2, "Expected %s to be doubled in %s", dateTime, zone); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return KuduQueryRunnerFactory.builder(closeAfterClass(new TestingKuduServer())).build(); + } + + @Test + void testBoolean() + { + SqlDataTypeTest.create() + .addRoundTrip("boolean", "NULL", BOOLEAN, "CAST(NULL AS BOOLEAN)") + .addRoundTrip("boolean", "true", BOOLEAN) + .addRoundTrip("boolean", "false", BOOLEAN) + .execute(getQueryRunner(), trinoCreateAsSelect("test_boolean")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_boolean")); + } + + @Test + void testTinyint() + { + SqlDataTypeTest.create() + .addRoundTrip("tinyint", "NULL", TINYINT, "CAST(NULL AS TINYINT)") + .addRoundTrip("tinyint", "-128", TINYINT, "TINYINT '-128'") + .addRoundTrip("tinyint", "5", TINYINT, "TINYINT '5'") + .addRoundTrip("tinyint", "127", TINYINT, "TINYINT '127'") + .execute(getQueryRunner(), trinoCreateAsSelect("test_tinyint")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_tinyint")); + } + + @Test + void testSmallint() + { + SqlDataTypeTest.create() + .addRoundTrip("smallint", "NULL", SMALLINT, "CAST(NULL AS SMALLINT)") + .addRoundTrip("smallint", "-32768", SMALLINT, "SMALLINT '-32768'") + .addRoundTrip("smallint", "32456", SMALLINT, "SMALLINT '32456'") + .addRoundTrip("smallint", "32767", SMALLINT, "SMALLINT '32767'") + .execute(getQueryRunner(), trinoCreateAsSelect("test_smallint")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_smallint")); + } + + @Test + void testInt() + { + SqlDataTypeTest.create() + .addRoundTrip("int", "NULL", INTEGER, "CAST(NULL AS INTEGER)") + .addRoundTrip("int", "-2147483648", INTEGER, "-2147483648") + .addRoundTrip("int", "1234567890", INTEGER, "1234567890") + .addRoundTrip("int", "2147483647", INTEGER, "2147483647") + .execute(getQueryRunner(), trinoCreateAsSelect("test_int")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_int")); + } + + @Test + void testBigint() + { + SqlDataTypeTest.create() + .addRoundTrip("bigint", "NULL", BIGINT, "CAST(NULL AS BIGINT)") + .addRoundTrip("bigint", "-9223372036854775808", BIGINT, "-9223372036854775808") + .addRoundTrip("bigint", "123456789012", BIGINT, "123456789012") + .addRoundTrip("bigint", "9223372036854775807", BIGINT, "9223372036854775807") + .execute(getQueryRunner(), trinoCreateAsSelect("test_bigint")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_bigint")); + } + + @Test + void testReal() + { + SqlDataTypeTest.create() + .addRoundTrip("real", "NULL", REAL, "CAST(NULL AS REAL)") + .addRoundTrip("real", "12.5", REAL, "REAL '12.5'") + .addRoundTrip("real", "nan()", REAL, "CAST(nan() AS REAL)") + .addRoundTrip("real", "-infinity()", REAL, "CAST(-infinity() AS REAL)") + .addRoundTrip("real", "+infinity()", REAL, "CAST(+infinity() AS REAL)") + .execute(getQueryRunner(), trinoCreateAsSelect("test_real")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_real")); + } + + @Test + void testDouble() + { + SqlDataTypeTest.create() + .addRoundTrip("double", "NULL", DOUBLE, "CAST(NULL AS DOUBLE)") + .addRoundTrip("double", "3.1415926835", DOUBLE, "DOUBLE '3.1415926835'") + .addRoundTrip("double", "1.79769E308", DOUBLE, "DOUBLE '1.79769E308'") + .addRoundTrip("double", "2.225E-307", DOUBLE, "DOUBLE '2.225E-307'") + .execute(getQueryRunner(), trinoCreateAsSelect("trino_test_double")) + .execute(getQueryRunner(), trinoCreateAndInsert("trino_test_double")); + } + + @Test + void testDecimal() + { + SqlDataTypeTest.create() + .addRoundTrip("decimal(3, 0)", "CAST(NULL AS decimal(3, 0))", createDecimalType(3, 0), "CAST(NULL AS decimal(3, 0))") + .addRoundTrip("decimal(3, 0)", "CAST('193' AS decimal(3, 0))", createDecimalType(3, 0), "CAST('193' AS decimal(3, 0))") + .addRoundTrip("decimal(3, 0)", "CAST('19' AS decimal(3, 0))", createDecimalType(3, 0), "CAST('19' AS decimal(3, 0))") + .addRoundTrip("decimal(3, 0)", "CAST('-193' AS decimal(3, 0))", createDecimalType(3, 0), "CAST('-193' AS decimal(3, 0))") + .addRoundTrip("decimal(3, 1)", "CAST('10.0' AS decimal(3, 1))", createDecimalType(3, 1), "CAST('10.0' AS decimal(3, 1))") + .addRoundTrip("decimal(3, 1)", "CAST('10.1' AS decimal(3, 1))", createDecimalType(3, 1), "CAST('10.1' AS decimal(3, 1))") + .addRoundTrip("decimal(3, 1)", "CAST('-10.1' AS decimal(3, 1))", createDecimalType(3, 1), "CAST('-10.1' AS decimal(3, 1))") + .addRoundTrip("decimal(4, 2)", "CAST('2' AS decimal(4, 2))", createDecimalType(4, 2), "CAST('2' AS decimal(4, 2))") + .addRoundTrip("decimal(4, 2)", "CAST('2.3' AS decimal(4, 2))", createDecimalType(4, 2), "CAST('2.3' AS decimal(4, 2))") + .addRoundTrip("decimal(24, 2)", "CAST('2' AS decimal(24, 2))", createDecimalType(24, 2), "CAST('2' AS decimal(24, 2))") + .addRoundTrip("decimal(24, 2)", "CAST('2.3' AS decimal(24, 2))", createDecimalType(24, 2), "CAST('2.3' AS decimal(24, 2))") + .addRoundTrip("decimal(24, 2)", "CAST('123456789.3' AS decimal(24, 2))", createDecimalType(24, 2), "CAST('123456789.3' AS decimal(24, 2))") + .addRoundTrip("decimal(24, 4)", "CAST('12345678901234567890.31' AS decimal(24, 4))", createDecimalType(24, 4), "CAST('12345678901234567890.31' AS decimal(24, 4))") + .addRoundTrip("decimal(30, 5)", "CAST('3141592653589793238462643.38327' AS decimal(30, 5))", createDecimalType(30, 5), "CAST('3141592653589793238462643.38327' AS decimal(30, 5))") + .addRoundTrip("decimal(30, 5)", "CAST('-3141592653589793238462643.38327' AS decimal(30, 5))", createDecimalType(30, 5), "CAST('-3141592653589793238462643.38327' AS decimal(30, 5))") + .addRoundTrip("decimal(38, 0)", "CAST(NULL AS decimal(38, 0))", createDecimalType(38, 0), "CAST(NULL AS decimal(38, 0))") + .addRoundTrip("decimal(38, 0)", "CAST('27182818284590452353602874713526624977' AS decimal(38, 0))", createDecimalType(38, 0), "CAST('27182818284590452353602874713526624977' AS decimal(38, 0))") + .addRoundTrip("decimal(38, 0)", "CAST('-27182818284590452353602874713526624977' AS decimal(38, 0))", createDecimalType(38, 0), "CAST('-27182818284590452353602874713526624977' AS decimal(38, 0))") + .addRoundTrip("decimal(38, 38)", "CAST('0.27182818284590452353602874713526624977' AS decimal(38, 38))", createDecimalType(38, 38), "CAST('0.27182818284590452353602874713526624977' AS decimal(38, 38))") + .execute(getQueryRunner(), trinoCreateAsSelect("trino_test_decimal")) + .execute(getQueryRunner(), trinoCreateAndInsert("trino_test_decimal")); + } + + @Test + void testVarchar() + { + SqlDataTypeTest.create() + .addRoundTrip("varchar", "NULL", VARCHAR, "CAST(NULL AS varchar)") + .addRoundTrip("varchar", "'text_a'", VARCHAR, "CAST('text_a' AS varchar)") + .addRoundTrip("varchar", "'text_b'", VARCHAR, "CAST('text_b' AS varchar)") + .addRoundTrip("varchar", "'攻殻機動隊'", VARCHAR, "CAST('攻殻機動隊' AS varchar)") + .addRoundTrip("varchar", "'攻殻機動隊'", VARCHAR, "CAST('攻殻機動隊' AS varchar)") + .addRoundTrip("varchar", "'😂'", VARCHAR, "CAST('😂' AS varchar)") + .addRoundTrip("varchar", "'Ну, погоди!'", VARCHAR, "CAST('Ну, погоди!' AS varchar)") + .execute(getQueryRunner(), trinoCreateAndInsert("test_varchar")) + .execute(getQueryRunner(), trinoCreateAsSelect("test_varchar")); + } + + @Test + void testVarbinary() + { + SqlDataTypeTest.create() + .addRoundTrip("varbinary", "NULL", VARBINARY, "CAST(NULL AS varbinary)") + .addRoundTrip("varbinary", "X''", VARBINARY, "X''") + .addRoundTrip("varbinary", "X'68656C6C6F'", VARBINARY, "to_utf8('hello')") + .addRoundTrip("varbinary", "X'5069C4996B6E6120C582C4856B61207720E69DB1E4BAACE983BD'", VARBINARY, "to_utf8('Piękna łąka w 東京都')") + .addRoundTrip("varbinary", "X'4261672066756C6C206F6620F09F92B0'", VARBINARY, "to_utf8('Bag full of 💰')") + .addRoundTrip("varbinary", "X'0001020304050607080DF9367AA7000000'", VARBINARY, "X'0001020304050607080DF9367AA7000000'") // non-text + .addRoundTrip("varbinary", "X'000000000000'", VARBINARY, "X'000000000000'") + .execute(getQueryRunner(), trinoCreateAsSelect("test_varbinary")) + .execute(getQueryRunner(), trinoCreateAndInsert("test_varbinary")); + } + + @Test + void testDate() + { + testDate(UTC); + testDate(jvmZone); + // using two non-JVM zones + testDate(vilnius); + testDate(kathmandu); + testDate(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId()); + } + + private void testDate(ZoneId sessionZone) + { + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + + dateTest(inputLiteral -> format("DATE %s", inputLiteral)) + .execute(getQueryRunner(), session, trinoCreateAsSelect(session, "test_date")) + .execute(getQueryRunner(), session, trinoCreateAsSelect("test_date")) + .execute(getQueryRunner(), session, trinoCreateAndInsert(session, "test_date")) + .execute(getQueryRunner(), session, trinoCreateAndInsert("test_date")); + } + + private static SqlDataTypeTest dateTest(Function<String, String> inputLiteralFactory) + { + return SqlDataTypeTest.create() + .addRoundTrip("date", "NULL", DATE, "CAST(NULL AS DATE)") + .addRoundTrip("date", inputLiteralFactory.apply("'0001-01-01'"), DATE, "DATE '0001-01-01'") // mon value in Kudu + .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-04'"), DATE, "DATE '1582-10-04'") // before julian->gregorian switch + .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-05'"), DATE, "DATE '1582-10-15'") // begin julian->gregorian switch + .addRoundTrip("date", inputLiteralFactory.apply("'1582-10-14'"), DATE, "DATE '1582-10-24'") // end julian->gregorian switch + .addRoundTrip("date", inputLiteralFactory.apply("'1952-04-03'"), DATE, "DATE '1952-04-03'") // before epoch + .addRoundTrip("date", inputLiteralFactory.apply("'1970-01-01'"), DATE, "DATE '1970-01-01'") + .addRoundTrip("date", inputLiteralFactory.apply("'1970-02-03'"), DATE, "DATE '1970-02-03'") + .addRoundTrip("date", inputLiteralFactory.apply("'1983-04-01'"), DATE, "DATE '1983-04-01'") + .addRoundTrip("date", inputLiteralFactory.apply("'1983-10-01'"), DATE, "DATE '1983-10-01'") + .addRoundTrip("date", inputLiteralFactory.apply("'2017-07-01'"), DATE, "DATE '2017-07-01'") // summer on northern hemisphere (possible DST) + .addRoundTrip("date", inputLiteralFactory.apply("'2017-01-01'"), DATE, "DATE '2017-01-01'") // winter on northern hemisphere (possible DST on southern hemisphere) + .addRoundTrip("date", inputLiteralFactory.apply("'9999-12-31'"), DATE, "DATE '9999-12-31'"); // max value in Kudu + } + + @Test + void testTimestamp() + { + testTimestamp(UTC); + testTimestamp(jvmZone); + // using two non-JVM zones + testTimestamp(vilnius); + testTimestamp(kathmandu); + testTimestamp(TestingSession.DEFAULT_TIME_ZONE_KEY.getZoneId()); + } + + private void testTimestamp(ZoneId sessionZone) + { + Session session = Session.builder(getSession()) + .setTimeZoneKey(TimeZoneKey.getTimeZoneKey(sessionZone.getId())) + .build(); + + SqlDataTypeTest.create() + .addRoundTrip("timestamp(3)", "TIMESTAMP '1958-01-01 13:18:03.123'", createTimestampType(3), "TIMESTAMP '1958-01-01 13:18:03.123'") + .addRoundTrip("timestamp(3)", "TIMESTAMP '2019-03-18 10:01:17.987'", createTimestampType(3), "TIMESTAMP '2019-03-18 10:01:17.987'") + .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-10-28 01:33:17.456'", createTimestampType(3), "TIMESTAMP '2018-10-28 01:33:17.456'") + .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-10-28 03:33:33.333'", createTimestampType(3), "TIMESTAMP '2018-10-28 03:33:33.333'") + + // epoch also is a gap in JVM zone + .addRoundTrip("timestamp(3)", "TIMESTAMP '1970-01-01 00:00:00.000'", createTimestampType(3), "TIMESTAMP '1970-01-01 00:00:00.000'") + + .addRoundTrip("timestamp(3)", "TIMESTAMP '1970-01-01 00:13:42.000'", createTimestampType(3), "TIMESTAMP '1970-01-01 00:13:42.000'") + .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-04-01 02:13:55.123'", createTimestampType(3), "TIMESTAMP '2018-04-01 02:13:55.123'") + .addRoundTrip("timestamp(3)", "TIMESTAMP '2018-03-25 03:17:17.000'", createTimestampType(3), "TIMESTAMP '2018-03-25 03:17:17.000'") + .addRoundTrip("timestamp(3)", "TIMESTAMP '1986-01-01 00:13:07.000'", createTimestampType(3), "TIMESTAMP '1986-01-01 00:13:07.000'") + .execute(getQueryRunner(), session, trinoCreateAsSelect(session, "test_timestamp")) + .execute(getQueryRunner(), session, trinoCreateAsSelect("test_timestamp")) + .execute(getQueryRunner(), session, trinoCreateAndInsert(session, "test_timestamp")) + .execute(getQueryRunner(), session, trinoCreateAndInsert("test_timestamp")); + } + + private DataSetup trinoCreateAsSelect(String tableNamePrefix) + { + return trinoCreateAsSelect(getSession(), tableNamePrefix); + } + + private DataSetup trinoCreateAsSelect(Session session, String tableNamePrefix) + { + return new CreateAsSelectDataSetup(new TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix); + } + + private DataSetup trinoCreateAndInsert(String tableNamePrefix) + { + return trinoCreateAndInsert(getSession(), tableNamePrefix); + } + + private DataSetup trinoCreateAndInsert(Session session, String tableNamePrefix) + { + return new KuduCreateAndInsertDataSetup(new TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix); + } +} diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java index 0cee1c087bf..c14584fdbc0 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java @@ -32,7 +32,7 @@ public class TestingKuduServer { private static final String KUDU_IMAGE = "apache/kudu"; public static final String EARLIEST_TAG = "1.13.0"; - public static final String LATEST_TAG = "1.15.0"; + public static final String LATEST_TAG = "1.17"; private static final Integer KUDU_MASTER_PORT = 7051; private static final Integer KUDU_TSERVER_PORT = 7050; @@ -61,14 +61,11 @@ public class TestingKuduServer public TestingKuduServer(String kuduVersion) { network = Network.newNetwork(); - - String hostIP = getHostIPAddress(); - String masterContainerAlias = "kudu-master"; this.master = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, kuduVersion)) .withExposedPorts(KUDU_MASTER_PORT) .withCommand("master") - .withEnv("MASTER_ARGS", "--default_num_replicas=1") + .withEnv("MASTER_ARGS", "--default_num_replicas=1 --unlock_unsafe_flags --use_hybrid_clock=false") .withNetwork(network) .withNetworkAliases(masterContainerAlias); @@ -78,14 +75,16 @@ public class TestingKuduServer toxiProxy.start(); String instanceName = "kudu-tserver"; + @SuppressWarnings("deprecation") ToxiproxyContainer.ContainerProxy proxy = toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT); tabletServer = new GenericContainer<>(format("%s:%s", KUDU_IMAGE, kuduVersion)) .withExposedPorts(KUDU_TSERVER_PORT) .withCommand("tserver") .withEnv("KUDU_MASTERS", format("%s:%s", masterContainerAlias, KUDU_MASTER_PORT)) - .withEnv("TSERVER_ARGS", format("--fs_wal_dir=/var/lib/kudu/tserver --logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s --rpc_advertised_addresses=%s:%s", instanceName, KUDU_TSERVER_PORT, hostIP, proxy.getProxyPort())) + .withEnv("TSERVER_ARGS", format("--fs_wal_dir=/var/lib/kudu/tserver --logtostderr --use_hybrid_clock=false --unlock_unsafe_flags --rpc_bind_addresses=%s:%s --rpc_advertised_addresses=%s:%s", instanceName, KUDU_TSERVER_PORT, TOXIPROXY_NETWORK_ALIAS, proxy.getOriginalProxyPort())) .withNetwork(network) .withNetworkAliases(instanceName) + .waitingFor(new KuduTabletWaitStrategy(master)) .dependsOn(master); master.start(); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index dd00a554460..52de3e7ecce 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -677,13 +677,13 @@ public abstract class BaseConnectorTest @Test public void testSelectVersionOfNonExistentTable() { + String tableName = "foo_" + randomNameSuffix(); String catalog = getSession().getCatalog().orElseThrow(); String schema = getSession().getSchema().orElseThrow(); - String tableName = "foo_" + randomNameSuffix(); assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " FOR TIMESTAMP AS OF TIMESTAMP '2021-03-01 00:00:01'")) - .hasMessage(format("line 1:15: Table '%s.%s.%s' does not exist", catalog, schema, tableName)); + .hasMessageMatching("line 1:15: Table '%s.%s.%s' does not exist|This connector does not support versioned tables".formatted(catalog, schema, tableName)); assertThatThrownBy(() -> query("SELECT * FROM " + tableName + " FOR VERSION AS OF 'version1'")) - .hasMessage(format("line 1:15: Table '%s.%s.%s' does not exist", catalog, schema, tableName)); + .hasMessageMatching("line 1:15: Table '%s.%s.%s' does not exist|This connector does not support versioned tables".formatted(catalog, schema, tableName)); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org