This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 6ff53e7c4d40f3986983439f35083407693026b5 Author: Joao Boto <b...@boto.pro> AuthorDate: Wed Mar 8 17:11:35 2023 +0100 [BAHIR-308] Bump flink version to 1.15.3 --- flink-connector-kudu/pom.xml | 16 ++------- .../kudu/connector/ColumnSchemasFactory.java | 1 - .../kudu/connector/CreateTableOptionsFactory.java | 1 - .../connectors/kudu/connector/KuduFilterInfo.java | 1 - .../connectors/kudu/connector/KuduTableInfo.java | 3 +- .../convertor/RowResultRowDataConvertor.java | 6 +--- .../kudu/connector/failure/KuduFailureHandler.java | 1 - .../kudu/connector/reader/KuduReader.java | 10 ++---- .../kudu/connector/reader/KuduReaderConfig.java | 3 +- .../writer/AbstractSingleOperationMapper.java | 1 - .../kudu/connector/writer/KuduOperationMapper.java | 1 - .../kudu/connector/writer/KuduWriter.java | 9 +---- .../kudu/connector/writer/KuduWriterConfig.java | 3 +- .../kudu/connector/writer/PojoOperationMapper.java | 6 +--- .../connectors/kudu/format/KuduOutputFormat.java | 1 - .../flink/connectors/kudu/streaming/KuduSink.java | 1 - .../kudu/table/AbstractReadOnlyCatalog.java | 22 ++---------- .../flink/connectors/kudu/table/KuduCatalog.java | 35 ++++--------------- .../connectors/kudu/table/KuduTableFactory.java | 28 +++------------ .../connectors/kudu/table/KuduTableSource.java | 13 ++----- .../kudu/table/UpsertOperationMapper.java | 1 - .../kudu/table/dynamic/KuduDynamicTableSource.java | 40 +++++++++++++++------- .../table/dynamic/catalog/KuduDynamicCatalog.java | 32 +++-------------- .../kudu/table/utils/KuduTableUtils.java | 12 ++----- .../connectors/kudu/table/utils/KuduTypeUtils.java | 14 +------- .../connectors/kudu/connector/KuduTestBase.java | 19 ++++------ .../kudu/format/KuduOutputFormatTest.java | 3 +- .../connectors/kudu/streaming/KuduSinkTest.java | 1 - .../connectors/kudu/table/KuduCatalogTest.java | 1 - .../kudu/table/KuduTableFactoryTest.java | 11 ++---- .../kudu/table/KuduTableSourceITCase.java | 4 +-- .../connectors/kudu/table/KuduTableSourceTest.java | 18 ++-------- .../connectors/kudu/table/KuduTableTestUtils.java | 4 +-- .../kudu/writer/AbstractOperationTest.java | 8 +---- .../kudu/writer/PojoOperationMapperTest.java | 3 +- .../kudu/writer/RowOperationMapperTest.java | 1 - .../kudu/writer/TupleOpertaionMapperTest.java | 1 - 37 files changed, 78 insertions(+), 257 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index 20b16b4..134d6f7 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -36,13 +36,6 @@ <dependencyManagement> <dependencies> - <dependency> - <groupId>org.apache.kudu</groupId> - <artifactId>kudu-binary</artifactId> - <version>${kudu.version}</version> - <classifier>${os.detected.classifier}</classifier> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> @@ -85,11 +78,11 @@ <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.binary.version}</artifactId> + <artifactId>flink-clients</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <artifactId>flink-streaming-java</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -99,11 +92,6 @@ <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> </dependency> - <dependency> - <groupId>org.apache.kudu</groupId> - <artifactId>kudu-binary</artifactId> - <classifier>${os.detected.classifier}</classifier> - </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java index b178308..4997938 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.ColumnSchema; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java index 4a475e9..fd9bfa4 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.client.CreateTableOptions; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java index e7a8d16..94e2e26 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java @@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.data.binary.BinaryStringData; - import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduPredicate; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java index baae8a0..655a914 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java @@ -16,9 +16,8 @@ */ package org.apache.flink.connectors.kudu.connector; -import org.apache.flink.annotation.PublicEvolving; - import org.apache.commons.lang3.Validate; +import org.apache.flink.annotation.PublicEvolving; import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java index b7dc702..5196d7f 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java @@ -17,11 +17,7 @@ package org.apache.flink.connectors.kudu.connector.convertor; -import org.apache.flink.table.data.DecimalData; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.*; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.RowResult; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java index 3c8954f..c67c6ed 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java @@ -17,7 +17,6 @@ package org.apache.flink.connectors.kudu.connector.failure; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.client.RowError; import java.io.IOException; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java index 6816fc3..72b734c 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java @@ -16,18 +16,12 @@ */ package org.apache.flink.connectors.kudu.connector.reader; +import org.apache.commons.collections.CollectionUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.connectors.kudu.connector.KuduFilterInfo; import org.apache.flink.connectors.kudu.connector.KuduTableInfo; - -import org.apache.commons.collections.CollectionUtils; import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduScanToken; -import org.apache.kudu.client.KuduSession; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.LocatedTablet; +import org.apache.kudu.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java index 468cb1e..4727488 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java @@ -16,9 +16,8 @@ */ package org.apache.flink.connectors.kudu.connector.reader; -import org.apache.flink.annotation.PublicEvolving; - import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java index d9f8219..794d56b 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java @@ -17,7 +17,6 @@ package org.apache.flink.connectors.kudu.connector.writer; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java index 4878ab3..886a3ea 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java @@ -17,7 +17,6 @@ package org.apache.flink.connectors.kudu.connector.writer; import org.apache.flink.annotation.PublicEvolving; - import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index 59ad196..2171a43 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -21,14 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler; import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler; - -import org.apache.kudu.client.DeleteTableResponse; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduSession; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; -import org.apache.kudu.client.OperationResponse; -import org.apache.kudu.client.RowError; +import org.apache.kudu.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java index 6c6d216..9b63494 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java @@ -16,9 +16,8 @@ */ package org.apache.flink.connectors.kudu.connector.writer; -import org.apache.flink.annotation.PublicEvolving; - import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.flink.annotation.PublicEvolving; import org.apache.kudu.client.AsyncKuduClient; import java.io.Serializable; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java index db44eec..253146c 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java @@ -19,11 +19,7 @@ package org.apache.flink.connectors.kudu.connector.writer; import org.apache.flink.annotation.PublicEvolving; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; @PublicEvolving public class PojoOperationMapper<T> extends AbstractSingleOperationMapper<T> { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java index 900515d..bc1b031 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java @@ -28,7 +28,6 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java index a671408..a2e3e47 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java index 2e1c63e..ffe02a8 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java @@ -19,26 +19,8 @@ package org.apache.flink.connectors.kudu.table; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.catalog.AbstractCatalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogPartition; -import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; -import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; -import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; -import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; -import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.catalog.exceptions.*; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java index 734e219..20ec341 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java @@ -21,29 +21,15 @@ package org.apache.flink.connectors.kudu.table; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils; -import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.catalog.exceptions.*; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.StringUtils; - -import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; - import org.apache.kudu.ColumnSchema; import org.apache.kudu.client.AlterTableOptions; import org.apache.kudu.client.KuduClient; @@ -53,19 +39,10 @@ import org.apache.kudu.shaded.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; -import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS; -import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS; -import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS; +import static org.apache.flink.connectors.kudu.table.KuduTableFactory.*; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -92,7 +69,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog { * @param kuduMasters Connection address to Kudu */ public KuduCatalog(String catalogName, String kuduMasters) { - super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE); + super(catalogName, "default_database"); this.kuduMasters = kuduMasters; this.kuduClient = createClient(); } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java index 9112b0a..c46ac85 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java @@ -30,32 +30,12 @@ import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.types.Row; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.DescriptorProperties.*; +import static org.apache.flink.table.descriptors.Rowtime.*; +import static org.apache.flink.table.descriptors.Schema.*; import static org.apache.flink.util.Preconditions.checkNotNull; public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java index ad98e86..74daa89 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java @@ -29,11 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.sources.FilterableTableSource; -import org.apache.flink.table.sources.LimitableTableSource; -import org.apache.flink.table.sources.ProjectableTableSource; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.*; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; @@ -42,12 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.ListIterator; -import java.util.Optional; +import java.util.*; import static org.apache.flink.connectors.kudu.table.utils.KuduTableUtils.toKuduFilterInfo; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java index 847dad4..31c8d79 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java @@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper; import org.apache.flink.types.Row; - import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java index 2022cd7..cde6a13 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java @@ -27,26 +27,24 @@ import org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupF import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.InputFormatProvider; -import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.connector.source.*; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.expressions.ResolvedExpression; -import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; import org.apache.kudu.shaded.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Optional; +import java.util.*; + +import static org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkArgument; +import static org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly; /** * A {@link DynamicTableSource} for Kudu. @@ -138,12 +136,28 @@ public class KuduDynamicTableSource implements ScanTableSource, SupportsProjecti } @Override - public void applyProjection(int[][] projectedFields) { + public void applyProjection(int[][] projectedFields, DataType producedDataType) { // parser projectFields - this.physicalSchema = TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields); + this.physicalSchema = projectSchema(this.physicalSchema, projectedFields); this.projectedFields = physicalSchema.getFieldNames(); } + private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields) { + checkArgument( + containsPhysicalColumnsOnly(tableSchema), + "Projection is only supported for physical columns."); + TableSchema.Builder builder = TableSchema.builder(); + + FieldsDataType fields = + (FieldsDataType) + DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields); + RowType topFields = (RowType) fields.getLogicalType(); + for (int i = 0; i < topFields.getFieldCount(); i++) { + builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i)); + } + return builder.build(); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java index b531835..3dff23c 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java @@ -22,21 +22,9 @@ import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.connectors.kudu.table.AbstractReadOnlyCatalog; import org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory; import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogDatabase; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogFunction; -import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.catalog.exceptions.*; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; @@ -52,20 +40,10 @@ import org.apache.kudu.shaded.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS; +import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -86,7 +64,7 @@ public class KuduDynamicCatalog extends AbstractReadOnlyCatalog { * @param kuduMasters Connection address to Kudu */ public KuduDynamicCatalog(String catalogName, String kuduMasters) { - super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE); + super(catalogName, "default_database"); this.kuduMasters = kuduMasters; this.kuduClient = createClient(); } diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java index 1d5be62..f5d7ca7 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java @@ -42,18 +42,10 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.stream.Collectors; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS; -import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS; +import static org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*; public class KuduTableUtils { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java index c445465..15f7be7 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java @@ -20,20 +20,8 @@ package org.apache.flink.connectors.kudu.table.utils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.AtomicDataType; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.FloatType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.SmallIntType; -import org.apache.flink.table.types.logical.TimestampType; -import org.apache.flink.table.types.logical.TinyIntType; -import org.apache.flink.table.types.logical.VarBinaryType; -import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.*; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; - import org.apache.kudu.ColumnTypeAttributes; import org.apache.kudu.Type; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java index bcc9b2d..ac4db4c 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java @@ -36,11 +36,7 @@ import org.apache.flink.types.Row; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; -import org.apache.kudu.client.CreateTableOptions; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduScanner; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.*; import org.apache.kudu.shaded.com.google.common.collect.Lists; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -76,7 +72,7 @@ public class KuduTestBase { public static String[] columns = new String[]{"id", "title", "author", "price", "quantity"}; private static GenericContainer<?> master; private static List<GenericContainer<?>> tServers; - private static String masterAddress; + private static HostAndPort masterAddress; private static KuduClient kuduClient; @BeforeAll @@ -90,7 +86,7 @@ public class KuduTestBase { .withNetwork(network) .withNetworkAliases("kudu-master"); master.start(); - masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)).toString(); + masterAddress = HostAndPort.fromParts(master.getHost(), master.getMappedPort(KUDU_MASTER_PORT)); for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) { String instanceName = "kudu-tserver-" + instance; @@ -98,8 +94,8 @@ public class KuduTestBase { .withExposedPorts(KUDU_TSERVER_PORT) .withCommand("tserver") .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT) - .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false " + - "--rpc_advertised_addresses=" + instanceName) + .withEnv("TSERVER_ARGS", "--fs_wal_dir=/var/lib/kudu/tserver --logtostderr " + +" --use_hybrid_clock=false --rpc_advertised_addresses=" + instanceName) .withNetwork(network) .withNetworkAliases(instanceName) .dependsOn(master); @@ -108,8 +104,7 @@ public class KuduTestBase { } tServers = tServersBuilder.build(); - System.out.println(HostAndPort.fromParts(master.getHost(), master.getMappedPort(8051)).toString()); - kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build(); + kuduClient = new KuduClient.KuduClientBuilder(masterAddress.toString()).build(); } @AfterAll @@ -239,7 +234,7 @@ public class KuduTestBase { } public String getMasterAddress() { - return masterAddress; + return masterAddress.toString(); } public KuduClient getClient() { diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java index dc8f777..f53b7c5 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java @@ -16,13 +16,12 @@ */ package org.apache.flink.connectors.kudu.format; -import org.apache.flink.connectors.kudu.connector.KuduTestBase; import org.apache.flink.connectors.kudu.connector.KuduTableInfo; +import org.apache.flink.connectors.kudu.connector.KuduTestBase; import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper; import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper; import org.apache.flink.types.Row; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java index 6791765..1764608 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java @@ -24,7 +24,6 @@ import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig; import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.types.Row; - import org.apache.kudu.ColumnSchema; import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java index 1927631..d403d9c 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java @@ -33,7 +33,6 @@ import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; - import org.apache.flink.types.Row; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java index f6482da..0375c68 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java @@ -40,18 +40,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.*; public class KuduTableFactoryTest extends KuduTestBase { private StreamTableEnvironment tableEnv; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java index d3d4a63..4a53198 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java @@ -53,7 +53,7 @@ public class KuduTableSourceITCase extends KuduTestBase { it.forEachRemaining(results::add); assertEquals(5, results.size()); assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString()); - tableEnv.sqlUpdate("DROP TABLE books"); + tableEnv.executeSql("DROP TABLE books"); } @@ -66,6 +66,6 @@ public class KuduTableSourceITCase extends KuduTestBase { it.forEachRemaining(results::add); assertEquals(1, results.size()); assertEquals("More Java for more dummies", results.get(0).toString()); - tableEnv.sqlUpdate("DROP TABLE books"); + tableEnv.executeSql("DROP TABLE books"); } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java index 43734e4..d4101aa 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java @@ -19,15 +19,10 @@ package org.apache.flink.connectors.kudu.table; import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.connectors.kudu.connector.KuduTestBase; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; -import org.apache.flink.table.expressions.CallExpression; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.expressions.FieldReferenceExpression; -import org.apache.flink.table.expressions.ResolvedExpression; -import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.expressions.*; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.types.DataType; @@ -43,14 +38,7 @@ import java.util.List; import static java.util.Collections.singletonList; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; /** * Unit Tests for {@link KuduTableSource}. @@ -68,7 +56,7 @@ public class KuduTableSourceTest extends KuduTestBase { KuduTableInfo tableInfo = booksTableInfo("books", true); setUpDatabase(tableInfo); catalog = new KuduCatalog(getMasterAddress()); - ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books"); + ObjectPath op = new ObjectPath("default_database", "books"); try { kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op)); } catch (TableNotExistException e) { diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java index 4eae7bf..affdd04 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java @@ -26,14 +26,14 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE public class KuduTableTestUtils { public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamingMode(StreamExecutionEnvironment env) { - EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); return tableEnv; } public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() { - EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1); return tableEnv; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java index f37b40d..12d82a9 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java @@ -17,14 +17,8 @@ package org.apache.flink.connectors.kudu.writer; import org.apache.flink.connectors.kudu.connector.KuduTestBase; - import org.apache.kudu.Schema; -import org.apache.kudu.client.Delete; -import org.apache.kudu.client.Insert; -import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.PartialRow; -import org.apache.kudu.client.Update; -import org.apache.kudu.client.Upsert; +import org.apache.kudu.client.*; import org.junit.jupiter.api.BeforeEach; import org.mockito.Mock; import org.mockito.MockitoAnnotations; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java index 45e0b1b..f03c469 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java @@ -17,11 +17,10 @@ package org.apache.flink.connectors.kudu.writer; -import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo; import org.apache.flink.connectors.kudu.connector.KuduTestBase; +import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo; import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper; import org.apache.flink.connectors.kudu.connector.writer.PojoOperationMapper; - import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; import org.junit.jupiter.api.Test; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java index e737063..3aeb673 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java @@ -20,7 +20,6 @@ import org.apache.flink.connectors.kudu.connector.KuduTestBase; import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper; import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper; import org.apache.flink.types.Row; - import org.apache.kudu.client.Operation; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java index 308a011..a52b7c1 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java @@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.connectors.kudu.connector.KuduTestBase; import org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper; import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper; - import org.apache.kudu.client.Operation; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test;