This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 6ff53e7c4d40f3986983439f35083407693026b5
Author: Joao Boto <b...@boto.pro>
AuthorDate: Wed Mar 8 17:11:35 2023 +0100

    [BAHIR-308] Bump flink version to 1.15.3
---
 flink-connector-kudu/pom.xml                       | 16 ++-------
 .../kudu/connector/ColumnSchemasFactory.java       |  1 -
 .../kudu/connector/CreateTableOptionsFactory.java  |  1 -
 .../connectors/kudu/connector/KuduFilterInfo.java  |  1 -
 .../connectors/kudu/connector/KuduTableInfo.java   |  3 +-
 .../convertor/RowResultRowDataConvertor.java       |  6 +---
 .../kudu/connector/failure/KuduFailureHandler.java |  1 -
 .../kudu/connector/reader/KuduReader.java          | 10 ++----
 .../kudu/connector/reader/KuduReaderConfig.java    |  3 +-
 .../writer/AbstractSingleOperationMapper.java      |  1 -
 .../kudu/connector/writer/KuduOperationMapper.java |  1 -
 .../kudu/connector/writer/KuduWriter.java          |  9 +----
 .../kudu/connector/writer/KuduWriterConfig.java    |  3 +-
 .../kudu/connector/writer/PojoOperationMapper.java |  6 +---
 .../connectors/kudu/format/KuduOutputFormat.java   |  1 -
 .../flink/connectors/kudu/streaming/KuduSink.java  |  1 -
 .../kudu/table/AbstractReadOnlyCatalog.java        | 22 ++----------
 .../flink/connectors/kudu/table/KuduCatalog.java   | 35 ++++---------------
 .../connectors/kudu/table/KuduTableFactory.java    | 28 +++------------
 .../connectors/kudu/table/KuduTableSource.java     | 13 ++-----
 .../kudu/table/UpsertOperationMapper.java          |  1 -
 .../kudu/table/dynamic/KuduDynamicTableSource.java | 40 +++++++++++++++-------
 .../table/dynamic/catalog/KuduDynamicCatalog.java  | 32 +++--------------
 .../kudu/table/utils/KuduTableUtils.java           | 12 ++-----
 .../connectors/kudu/table/utils/KuduTypeUtils.java | 14 +-------
 .../connectors/kudu/connector/KuduTestBase.java    | 19 ++++------
 .../kudu/format/KuduOutputFormatTest.java          |  3 +-
 .../connectors/kudu/streaming/KuduSinkTest.java    |  1 -
 .../connectors/kudu/table/KuduCatalogTest.java     |  1 -
 .../kudu/table/KuduTableFactoryTest.java           | 11 ++----
 .../kudu/table/KuduTableSourceITCase.java          |  4 +--
 .../connectors/kudu/table/KuduTableSourceTest.java | 18 ++--------
 .../connectors/kudu/table/KuduTableTestUtils.java  |  4 +--
 .../kudu/writer/AbstractOperationTest.java         |  8 +----
 .../kudu/writer/PojoOperationMapperTest.java       |  3 +-
 .../kudu/writer/RowOperationMapperTest.java        |  1 -
 .../kudu/writer/TupleOpertaionMapperTest.java      |  1 -
 37 files changed, 78 insertions(+), 257 deletions(-)

diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index 20b16b4..134d6f7 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -36,13 +36,6 @@
 
   <dependencyManagement>
     <dependencies>
-      <dependency>
-        <groupId>org.apache.kudu</groupId>
-        <artifactId>kudu-binary</artifactId>
-        <version>${kudu.version}</version>
-        <classifier>${os.detected.classifier}</classifier>
-        <scope>test</scope>
-      </dependency>
       <dependency>
         <groupId>org.apache.kudu</groupId>
         <artifactId>kudu-client</artifactId>
@@ -85,11 +78,11 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <artifactId>flink-clients</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <artifactId>flink-streaming-java</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
@@ -99,11 +92,6 @@
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.kudu</groupId>
-      <artifactId>kudu-binary</artifactId>
-      <classifier>${os.detected.classifier}</classifier>
-    </dependency>
     <dependency>
       <groupId>org.apache.kudu</groupId>
       <artifactId>kudu-client</artifactId>
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
index b178308..4997938 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/ColumnSchemasFactory.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.ColumnSchema;
 
 import java.io.Serializable;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
index 4a475e9..fd9bfa4 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/CreateTableOptionsFactory.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.CreateTableOptions;
 
 import java.io.Serializable;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index e7a8d16..94e2e26 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -18,7 +18,6 @@ package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.data.binary.BinaryStringData;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduPredicate;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
index baae8a0..655a914 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduTableInfo.java
@@ -16,9 +16,8 @@
  */
 package org.apache.flink.connectors.kudu.connector;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
index b7dc702..5196d7f 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/convertor/RowResultRowDataConvertor.java
@@ -17,11 +17,7 @@
 
 package org.apache.flink.connectors.kudu.connector.convertor;
 
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.*;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.RowResult;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
index 3c8954f..c67c6ed 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/failure/KuduFailureHandler.java
@@ -17,7 +17,6 @@
 package org.apache.flink.connectors.kudu.connector.failure;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.RowError;
 
 import java.io.IOException;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
index 6816fc3..72b734c 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java
@@ -16,18 +16,12 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
-
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.connectors.kudu.connector.convertor.RowResultConvertor;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduScanToken;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
index 468cb1e..4727488 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReaderConfig.java
@@ -16,9 +16,8 @@
  */
 package org.apache.flink.connectors.kudu.connector.reader;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
index d9f8219..794d56b 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/AbstractSingleOperationMapper.java
@@ -17,7 +17,6 @@
 package org.apache.flink.connectors.kudu.connector.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
index 4878ab3..886a3ea 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduOperationMapper.java
@@ -17,7 +17,6 @@
 package org.apache.flink.connectors.kudu.connector.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
index 59ad196..2171a43 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java
@@ -21,14 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import 
org.apache.flink.connectors.kudu.connector.failure.DefaultKuduFailureHandler;
 import org.apache.flink.connectors.kudu.connector.failure.KuduFailureHandler;
-
-import org.apache.kudu.client.DeleteTableResponse;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
index 6c6d216..9b63494 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriterConfig.java
@@ -16,9 +16,8 @@
  */
 package org.apache.flink.connectors.kudu.connector.writer;
 
-import org.apache.flink.annotation.PublicEvolving;
-
 import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.kudu.client.AsyncKuduClient;
 
 import java.io.Serializable;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
index db44eec..253146c 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/PojoOperationMapper.java
@@ -19,11 +19,7 @@ package org.apache.flink.connectors.kudu.connector.writer;
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 @PublicEvolving
 public class PojoOperationMapper<T> extends AbstractSingleOperationMapper<T> {
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
index 900515d..bc1b031 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/format/KuduOutputFormat.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
index a671408..a2e3e47 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/streaming/KuduSink.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
index 2e1c63e..ffe02a8 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/AbstractReadOnlyCatalog.java
@@ -19,26 +19,8 @@
 package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.AbstractCatalog;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartition;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
-import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
-import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
-import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 734e219..20ec341 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -21,29 +21,15 @@ package org.apache.flink.connectors.kudu.table;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
-import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.AlterTableOptions;
 import org.apache.kudu.client.KuduClient;
@@ -53,19 +39,10 @@ import 
org.apache.kudu.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
-import static 
org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
-import static 
org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
+import static org.apache.flink.connectors.kudu.table.KuduTableFactory.*;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -92,7 +69,7 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
      * @param kuduMasters Connection address to Kudu
      */
     public KuduCatalog(String catalogName, String kuduMasters) {
-        super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+        super(catalogName, "default_database");
         this.kuduMasters = kuduMasters;
         this.kuduClient = createClient();
     }
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 9112b0a..c46ac85 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -30,32 +30,12 @@ import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.types.Row;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static 
org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.*;
+import static org.apache.flink.table.descriptors.Rowtime.*;
+import static org.apache.flink.table.descriptors.Schema.*;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 public class KuduTableFactory implements TableSourceFactory<Row>, 
TableSinkFactory<Tuple2<Boolean, Row>> {
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index ad98e86..74daa89 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -29,11 +29,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.sources.FilterableTableSource;
-import org.apache.flink.table.sources.LimitableTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.*;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -42,12 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Optional;
+import java.util.*;
 
 import static 
org.apache.flink.connectors.kudu.table.utils.KuduTableUtils.toKuduFilterInfo;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
index 847dad4..31c8d79 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/UpsertOperationMapper.java
@@ -20,7 +20,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Operation;
 
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
index 2022cd7..cde6a13 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSource.java
@@ -27,26 +27,24 @@ import 
org.apache.flink.connectors.kudu.table.function.lookup.KuduRowDataLookupF
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.InputFormatProvider;
-import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.*;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
+import java.util.*;
+
+import static 
org.apache.flink.calcite.shaded.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.flink.table.utils.TableSchemaUtils.containsPhysicalColumnsOnly;
 
 /**
  * A {@link DynamicTableSource} for Kudu.
@@ -138,12 +136,28 @@ public class KuduDynamicTableSource implements 
ScanTableSource, SupportsProjecti
     }
 
     @Override
-    public void applyProjection(int[][] projectedFields) {
+    public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
         // parser projectFields
-        this.physicalSchema = 
TableSchemaUtils.projectSchema(this.physicalSchema, projectedFields);
+        this.physicalSchema = projectSchema(this.physicalSchema, 
projectedFields);
         this.projectedFields = physicalSchema.getFieldNames();
     }
 
+    private TableSchema projectSchema(TableSchema tableSchema, int[][] 
projectedFields) {
+        checkArgument(
+                containsPhysicalColumnsOnly(tableSchema),
+                "Projection is only supported for physical columns.");
+        TableSchema.Builder builder = TableSchema.builder();
+
+        FieldsDataType fields =
+                (FieldsDataType)
+                        DataTypeUtils.projectRow(tableSchema.toRowDataType(), 
projectedFields);
+        RowType topFields = (RowType) fields.getLogicalType();
+        for (int i = 0; i < topFields.getFieldCount(); i++) {
+            builder.field(topFields.getFieldNames().get(i), 
fields.getChildren().get(i));
+        }
+        return builder.build();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
index b531835..3dff23c 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java
@@ -22,21 +22,9 @@ import 
org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.table.AbstractReadOnlyCatalog;
 import 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory;
 import org.apache.flink.connectors.kudu.table.utils.KuduTableUtils;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogBaseTable;
-import org.apache.flink.table.catalog.CatalogDatabase;
-import org.apache.flink.table.catalog.CatalogDatabaseImpl;
-import org.apache.flink.table.catalog.CatalogFunction;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
-import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
-import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.exceptions.*;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
@@ -52,20 +40,10 @@ import 
org.apache.kudu.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -86,7 +64,7 @@ public class KuduDynamicCatalog extends 
AbstractReadOnlyCatalog {
      * @param kuduMasters Connection address to Kudu
      */
     public KuduDynamicCatalog(String catalogName, String kuduMasters) {
-        super(catalogName, EnvironmentSettings.DEFAULT_BUILTIN_DATABASE);
+        super(catalogName, "default_database");
         this.kuduMasters = kuduMasters;
         this.kuduClient = createClient();
     }
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index 1d5be62..f5d7ca7 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -42,18 +42,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_COLS;
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_HASH_PARTITION_NUMS;
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_PRIMARY_KEY_COLS;
-import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.KUDU_REPLICAS;
+import static 
org.apache.flink.connectors.kudu.table.dynamic.KuduDynamicTableSourceSinkFactory.*;
 
 
 public class KuduTableUtils {
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
index c445465..15f7be7 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTypeUtils.java
@@ -20,20 +20,8 @@ package org.apache.flink.connectors.kudu.table.utils;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.FloatType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.SmallIntType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.*;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
-
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Type;
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index bcc9b2d..ac4db4c 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -36,11 +36,7 @@ import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
-import org.apache.kudu.client.CreateTableOptions;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduScanner;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.*;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -76,7 +72,7 @@ public class KuduTestBase {
     public static String[] columns = new String[]{"id", "title", "author", 
"price", "quantity"};
     private static GenericContainer<?> master;
     private static List<GenericContainer<?>> tServers;
-    private static String masterAddress;
+    private static HostAndPort masterAddress;
     private static KuduClient kuduClient;
 
     @BeforeAll
@@ -90,7 +86,7 @@ public class KuduTestBase {
                 .withNetwork(network)
                 .withNetworkAliases("kudu-master");
         master.start();
-        masterAddress = HostAndPort.fromParts(master.getHost(), 
master.getMappedPort(KUDU_MASTER_PORT)).toString();
+        masterAddress = HostAndPort.fromParts(master.getHost(), 
master.getMappedPort(KUDU_MASTER_PORT));
 
         for (int instance = 1; instance <= NUMBER_OF_REPLICA; instance++) {
             String instanceName = "kudu-tserver-" + instance;
@@ -98,8 +94,8 @@ public class KuduTestBase {
                     .withExposedPorts(KUDU_TSERVER_PORT)
                     .withCommand("tserver")
                     .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT)
-                    .withEnv("TSERVER_ARGS", 
"--fs_wal_dir=/var/lib/kudu/tserver --use_hybrid_clock=false " +
-                            "--rpc_advertised_addresses=" + instanceName)
+                    .withEnv("TSERVER_ARGS", 
"--fs_wal_dir=/var/lib/kudu/tserver --logtostderr "
+                            +" --use_hybrid_clock=false 
--rpc_advertised_addresses=" + instanceName)
                     .withNetwork(network)
                     .withNetworkAliases(instanceName)
                     .dependsOn(master);
@@ -108,8 +104,7 @@ public class KuduTestBase {
         }
         tServers = tServersBuilder.build();
 
-        System.out.println(HostAndPort.fromParts(master.getHost(), 
master.getMappedPort(8051)).toString());
-        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).build();
+        kuduClient = new 
KuduClient.KuduClientBuilder(masterAddress.toString()).build();
     }
 
     @AfterAll
@@ -239,7 +234,7 @@ public class KuduTestBase {
     }
 
     public String getMasterAddress() {
-        return masterAddress;
+        return masterAddress.toString();
     }
 
     public KuduClient getClient() {
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
index dc8f777..f53b7c5 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/format/KuduOutputFormatTest.java
@@ -16,13 +16,12 @@
  */
 package org.apache.flink.connectors.kudu.format;
 
-import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
index 6791765..1764608 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connectors.kudu.connector.writer.KuduWriterConfig;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.CreateTableOptions;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index 1927631..d403d9c 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
-
 import org.apache.flink.types.Row;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index f6482da..0375c68 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -40,18 +40,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.*;
 
 public class KuduTableFactoryTest extends KuduTestBase {
     private StreamTableEnvironment tableEnv;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
index d3d4a63..4a53198 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -53,7 +53,7 @@ public class KuduTableSourceITCase extends KuduTestBase {
         it.forEachRemaining(results::add);
         assertEquals(5, results.size());
         assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", 
results.get(0).toString());
-        tableEnv.sqlUpdate("DROP TABLE books");
+        tableEnv.executeSql("DROP TABLE books");
     }
 
 
@@ -66,6 +66,6 @@ public class KuduTableSourceITCase extends KuduTestBase {
         it.forEachRemaining(results::add);
         assertEquals(1, results.size());
         assertEquals("More Java for more dummies", results.get(0).toString());
-        tableEnv.sqlUpdate("DROP TABLE books");
+        tableEnv.executeSql("DROP TABLE books");
     }
 }
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
index 43734e4..d4101aa 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -19,15 +19,10 @@ package org.apache.flink.connectors.kudu.table;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.apache.flink.table.expressions.CallExpression;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.FieldReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.*;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.types.DataType;
@@ -43,14 +38,7 @@ import java.util.List;
 import static java.util.Collections.singletonList;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Unit Tests for {@link KuduTableSource}.
@@ -68,7 +56,7 @@ public class KuduTableSourceTest extends KuduTestBase {
         KuduTableInfo tableInfo = booksTableInfo("books", true);
         setUpDatabase(tableInfo);
         catalog = new KuduCatalog(getMasterAddress());
-        ObjectPath op = new 
ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books");
+        ObjectPath op = new ObjectPath("default_database", "books");
         try {
             kuduTableSource = 
catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));
         } catch (TableNotExistException e) {
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
index 4eae7bf..affdd04 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -26,14 +26,14 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
 public class KuduTableTestUtils {
 
     public static StreamTableEnvironment 
createTableEnvWithBlinkPlannerStreamingMode(StreamExecutionEnvironment env) {
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
         
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(),
 1);
         return tableEnv;
     }
 
     public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() {
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
         TableEnvironment tableEnv = TableEnvironment.create(settings);
         
tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(),
 1);
         return tableEnv;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
index f37b40d..12d82a9 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/AbstractOperationTest.java
@@ -17,14 +17,8 @@
 package org.apache.flink.connectors.kudu.writer;
 
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
-
 import org.apache.kudu.Schema;
-import org.apache.kudu.client.Delete;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.PartialRow;
-import org.apache.kudu.client.Update;
-import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.*;
 import org.junit.jupiter.api.BeforeEach;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
index 45e0b1b..f03c469 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/PojoOperationMapperTest.java
@@ -17,11 +17,10 @@
 
 package org.apache.flink.connectors.kudu.writer;
 
-import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase.BookInfo;
 import 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.PojoOperationMapper;
-
 import org.apache.kudu.client.Operation;
 import org.apache.kudu.client.PartialRow;
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
index e737063..3aeb673 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/RowOperationMapperTest.java
@@ -20,7 +20,6 @@ import 
org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.RowOperationMapper;
 import org.apache.flink.types.Row;
-
 import org.apache.kudu.client.Operation;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
index 308a011..a52b7c1 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/writer/TupleOpertaionMapperTest.java
@@ -20,7 +20,6 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.connectors.kudu.connector.KuduTestBase;
 import 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper;
 import org.apache.flink.connectors.kudu.connector.writer.TupleOperationMapper;
-
 import org.apache.kudu.client.Operation;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;

Reply via email to