This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 012e611  [FLINK-11516][table-common] Port and move catalog transitive 
classes to flink-table-common
012e611 is described below

commit 012e6118ad3de2bfaadd46c76f25bf9d4ad9a9d1
Author: Dian Fu <fudian...@alibaba-inc.com>
AuthorDate: Fri Feb 1 23:23:06 2019 +0800

    [FLINK-11516][table-common] Port and move catalog transitive classes to 
flink-table-common
    
    This closes #7642.
---
 .../ElasticsearchUpsertTableSinkFactoryBase.java   |   4 +-
 .../kafka/KafkaTableSourceSinkFactoryBase.java     |   2 +-
 .../gateway/utils/TestTableSinkFactoryBase.java    |   2 +-
 .../gateway/utils/TestTableSourceFactoryBase.java  |   2 +-
 .../apache/flink/table/descriptors/Metadata.java   |  78 ++++++++
 .../flink/table/descriptors/MetadataValidator.java |  41 ++++
 .../apache/flink/table/descriptors/Statistics.java | 168 +++++++++++++++++
 .../table/descriptors/StatisticsValidator.java     | 126 +++++++++++++
 .../StreamTableDescriptorValidator.java            |  65 +++++++
 .../table/descriptors/StreamableDescriptor.java    |  73 ++++++++
 .../apache/flink/table/plan/stats/ColumnStats.java | 124 ++++++++++++
 .../apache/flink/table/plan/stats/TableStats.java} |  46 ++++-
 .../descriptors/DescriptorPropertiesTest.java      | 201 ++++++++++++++++++++
 .../flink/table/descriptors/MetadataTest.java      |  69 +++++++
 .../flink/table/descriptors/StatisticsTest.java    | 102 ++++++++++
 .../flink/table/catalog/ExternalCatalogTable.scala |   4 +-
 .../flink/table/catalog/ExternalTableUtil.scala    |   1 -
 .../apache/flink/table/descriptors/Metadata.scala  |  88 ---------
 .../table/descriptors/MetadataValidator.scala      |  43 -----
 .../flink/table/descriptors/Statistics.scala       | 166 -----------------
 .../table/descriptors/StatisticsValidator.scala    | 120 ------------
 .../StreamTableDescriptorValidator.scala           |  59 ------
 .../table/descriptors/StreamableDescriptor.scala   |  67 -------
 .../flink/table/plan/schema/DataSetTable.scala     |   2 +-
 .../flink/table/plan/stats/ColumnStats.scala       |  54 ------
 .../flink/table/plan/stats/FlinkStatistic.scala    |   4 +-
 .../descriptors/DescriptorPropertiesTest.scala     | 207 ---------------------
 .../flink/table/descriptors/MetadataTest.scala     |  59 ------
 .../flink/table/descriptors/StatisticsTest.scala   |  89 ---------
 29 files changed, 1093 insertions(+), 973 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
index 63e9b34..f52de79 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
@@ -113,7 +113,7 @@ public abstract class 
ElasticsearchUpsertTableSinkFactoryBase implements StreamT
                final List<String> properties = new ArrayList<>();
 
                // streaming properties
-               properties.add(UPDATE_MODE());
+               properties.add(UPDATE_MODE);
 
                // Elasticsearch
                properties.add(CONNECTOR_HOSTS + ".#." + 
CONNECTOR_HOSTS_HOSTNAME);
@@ -150,7 +150,7 @@ public abstract class 
ElasticsearchUpsertTableSinkFactoryBase implements StreamT
                final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
 
                return createElasticsearchUpsertTableSink(
-                       descriptorProperties.isValue(UPDATE_MODE(), 
UPDATE_MODE_VALUE_APPEND()),
+                       descriptorProperties.isValue(UPDATE_MODE, 
UPDATE_MODE_VALUE_APPEND),
                        descriptorProperties.getTableSchema(SCHEMA()),
                        getHosts(descriptorProperties),
                        descriptorProperties.getString(CONNECTOR_INDEX),
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 542dbd9..146eba5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -92,7 +92,7 @@ public abstract class KafkaTableSourceSinkFactoryBase 
implements
        @Override
        public Map<String, String> requiredContext() {
                Map<String, String> context = new HashMap<>();
-               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // 
append mode
+               context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); // append 
mode
                context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA); // 
kafka
                context.put(CONNECTOR_VERSION, kafkaVersion()); // version
                context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards 
compatibility
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index d7d4f9f..c3c4152 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -61,7 +61,7 @@ public abstract class TestTableSinkFactoryBase implements 
StreamTableSinkFactory
        @Override
        public Map<String, String> requiredContext() {
                final Map<String, String> context = new HashMap<>();
-               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND());
+               context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
                context.put(CONNECTOR_TYPE, type);
                return context;
        }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index 59091d8..12eee4f 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -64,7 +64,7 @@ public abstract class TestTableSourceFactoryBase implements 
StreamTableSourceFac
        @Override
        public Map<String, String> requiredContext() {
                final Map<String, String> context = new HashMap<>();
-               context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND());
+               context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
                context.put(CONNECTOR_TYPE, type);
                return context;
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Metadata.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Metadata.java
new file mode 100644
index 0000000..5e74bec
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Metadata.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_COMMENT;
+import static 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_CREATION_TIME;
+import static 
org.apache.flink.table.descriptors.MetadataValidator.METADATA_LAST_ACCESS_TIME;
+
+/**
+ * Metadata descriptor for adding additional, useful information.
+ */
+@PublicEvolving
+public class Metadata implements Descriptor {
+
+       private final DescriptorProperties internalProperties = new 
DescriptorProperties(true);
+
+       public Metadata() {}
+
+       /**
+        * Sets a comment.
+        *
+        * @param comment the description
+        */
+       public Metadata comment(String comment) {
+               internalProperties.putString(METADATA_COMMENT, comment);
+               return this;
+       }
+
+       /**
+        * Sets a creation time.
+        *
+        * @param time UTC milliseconds timestamp
+        */
+       public Metadata creationTime(long time) {
+               internalProperties.putLong(METADATA_CREATION_TIME, time);
+               return this;
+       }
+
+       /**
+        * Sets a last access time.
+        *
+        * @param time UTC milliseconds timestamp
+        */
+       public Metadata lastAccessTime(long time) {
+               internalProperties.putLong(METADATA_LAST_ACCESS_TIME, time);
+               return this;
+       }
+
+       /**
+        * Converts this descriptor into a set of properties.
+        */
+       @Override
+       public final Map<String, String> toProperties() {
+               final DescriptorProperties properties = new 
DescriptorProperties();
+               properties.putProperties(internalProperties);
+               return properties.asMap();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/MetadataValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/MetadataValidator.java
new file mode 100644
index 0000000..e09c8ec
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/MetadataValidator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Validator for {@link Metadata}.
+ */
+@Internal
+public class MetadataValidator implements DescriptorValidator {
+
+       public static final String METADATA_PROPERTY_VERSION = 
"metadata.property-version";
+       public static final String METADATA_COMMENT = "metadata.comment";
+       public static final String METADATA_CREATION_TIME = 
"metadata.creation-time";
+       public static final String METADATA_LAST_ACCESS_TIME = 
"metadata.last-access-time";
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               properties.validateInt(METADATA_PROPERTY_VERSION, true, 0, 
Integer.MAX_VALUE);
+               properties.validateString(METADATA_COMMENT, true);
+               properties.validateLong(METADATA_CREATION_TIME, true);
+               properties.validateLong(METADATA_LAST_ACCESS_TIME, true);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Statistics.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Statistics.java
new file mode 100644
index 0000000..f8d1c86
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Statistics.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.AVG_LENGTH;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.DISTINCT_COUNT;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.MAX_LENGTH;
+import static org.apache.flink.table.descriptors.StatisticsValidator.MAX_VALUE;
+import static org.apache.flink.table.descriptors.StatisticsValidator.MIN_VALUE;
+import static org.apache.flink.table.descriptors.StatisticsValidator.NAME;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.NULL_COUNT;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_COLUMNS;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_ROW_COUNT;
+import static 
org.apache.flink.table.descriptors.StatisticsValidator.normalizeColumnStats;
+
+/**
+ * Statistics descriptor for describing table stats.
+ */
+@PublicEvolving
+public class Statistics implements Descriptor {
+
+       private final DescriptorProperties internalProperties = new 
DescriptorProperties(true);
+       private LinkedHashMap<String, Map<String, String>> columnStats = new 
LinkedHashMap<>();
+
+       /**
+        * Sets the statistics from a {@link TableStats} instance.
+        *
+        * <p>This method overwrites all existing statistics.
+        *
+        * @param tableStats the table statistics
+        */
+       public Statistics tableStats(TableStats tableStats) {
+               rowCount(tableStats.getRowCount());
+               columnStats.clear();
+               tableStats.getColumnStats().forEach(this::columnStats);
+               return this;
+       }
+
+       /**
+        * Sets statistics for the overall row count. Required.
+        *
+        * @param rowCount the expected number of rows
+        */
+       public Statistics rowCount(long rowCount) {
+               internalProperties.putLong(STATISTICS_ROW_COUNT, rowCount);
+               return this;
+       }
+
+       /**
+        * Sets statistics for a column. Overwrites all existing statistics for 
this column.
+        *
+        * @param columnName  the column name
+        * @param columnStats expected statistics for the column
+        */
+       public Statistics columnStats(String columnName, ColumnStats 
columnStats) {
+               Map<String, String> map = normalizeColumnStats(columnStats);
+               this.columnStats.put(columnName, map);
+               return this;
+       }
+
+       /**
+        * Sets the number of distinct values statistic for the given column.
+        */
+       public Statistics columnDistinctCount(String columnName, Long ndv) {
+               this.columnStats
+                       .computeIfAbsent(columnName, column -> new HashMap<>())
+                       .put(DISTINCT_COUNT, String.valueOf(ndv));
+               return this;
+       }
+
+       /**
+        * Sets the number of null values statistic for the given column.
+        */
+       public Statistics columnNullCount(String columnName, Long nullCount) {
+               this.columnStats
+                       .computeIfAbsent(columnName, column -> new HashMap<>())
+                       .put(NULL_COUNT, String.valueOf(nullCount));
+               return this;
+       }
+
+       /**
+        * Sets the average length statistic for the given column.
+        */
+       public Statistics columnAvgLength(String columnName, Double avgLen) {
+               this.columnStats
+                       .computeIfAbsent(columnName, column -> new HashMap<>())
+                       .put(AVG_LENGTH, String.valueOf(avgLen));
+               return this;
+       }
+
+       /**
+        * Sets the maximum length statistic for the given column.
+        */
+       public Statistics columnMaxLength(String columnName, Integer maxLen) {
+               this.columnStats
+                       .computeIfAbsent(columnName, column -> new HashMap<>())
+                       .put(MAX_LENGTH, String.valueOf(maxLen));
+               return this;
+       }
+
+       /**
+        * Sets the maximum value statistic for the given column.
+        */
+       public Statistics columnMaxValue(String columnName, Number max) {
+               this.columnStats
+                       .computeIfAbsent(columnName, column -> new HashMap<>())
+                       .put(MAX_VALUE, String.valueOf(max));
+               return this;
+       }
+
+       /**
+        * Sets the minimum value statistic for the given column.
+        */
+       public Statistics columnMinValue(String columnName, Number min) {
+               this.columnStats
+                       .computeIfAbsent(columnName, column -> new HashMap<>())
+                       .put(MIN_VALUE, String.valueOf(min));
+               return this;
+       }
+
+       /**
+        * Converts this descriptor into a set of properties.
+        */
+       @Override
+       public final Map<String, String> toProperties() {
+               final DescriptorProperties properties = new 
DescriptorProperties();
+               properties.putProperties(internalProperties);
+
+               properties.putInt(STATISTICS_PROPERTY_VERSION, 1);
+
+               List<Map<String, String>> namedStats = new ArrayList<>();
+               for (Map.Entry<String, Map<String, String>> entry : 
columnStats.entrySet()) {
+                       Map<String, String> columnStat = entry.getValue();
+                       columnStat.put(NAME, entry.getKey());
+                       namedStats.add(columnStat);
+               }
+
+               properties.putIndexedVariableProperties(STATISTICS_COLUMNS, 
namedStats);
+               return properties.asMap();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java
new file mode 100644
index 0000000..e8797e8
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.plan.stats.ColumnStats;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Validator for {@link Statistics}.
+ */
+@Internal
+public class StatisticsValidator implements DescriptorValidator {
+
+       public static final String STATISTICS_PROPERTY_VERSION = 
"statistics.property-version";
+       public static final String STATISTICS_ROW_COUNT = 
"statistics.row-count";
+       public static final String STATISTICS_COLUMNS = "statistics.columns";
+
+       // per column properties
+       public static final String NAME = "name";
+       public static final String DISTINCT_COUNT = "distinct-count";
+       public static final String NULL_COUNT = "null-count";
+       public static final String AVG_LENGTH = "avg-length";
+       public static final String MAX_LENGTH = "max-length";
+       public static final String MAX_VALUE = "max-value";
+       public static final String MIN_VALUE = "min-value";
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               properties.validateInt(STATISTICS_PROPERTY_VERSION, true, 0, 
Integer.MAX_VALUE);
+               properties.validateLong(STATISTICS_ROW_COUNT, true, 0);
+               validateColumnStats(properties, STATISTICS_COLUMNS);
+       }
+
+       // utilities
+
+       public static Map<String, String> normalizeColumnStats(ColumnStats 
columnStats) {
+               Map<String, String> stats = new HashMap<>();
+               if (columnStats.getNdv() != null) {
+                       stats.put(DISTINCT_COUNT, 
String.valueOf(columnStats.getNdv()));
+               }
+               if (columnStats.getNullCount() != null) {
+                       stats.put(NULL_COUNT, 
String.valueOf(columnStats.getNullCount()));
+               }
+               if (columnStats.getAvgLen() != null) {
+                       stats.put(AVG_LENGTH, 
String.valueOf(columnStats.getAvgLen()));
+               }
+               if (columnStats.getMaxLen() != null) {
+                       stats.put(MAX_LENGTH, 
String.valueOf(columnStats.getMaxLen()));
+               }
+               if (columnStats.getMaxValue() != null) {
+                       stats.put(MAX_VALUE, 
String.valueOf(columnStats.getMaxValue()));
+               }
+               if (columnStats.getMinValue() != null) {
+                       stats.put(MIN_VALUE, 
String.valueOf(columnStats.getMinValue()));
+               }
+               return stats;
+       }
+
+       public static void validateColumnStats(DescriptorProperties properties, 
String key) {
+
+               // filter for number of columns
+               int columnCount = properties.getIndexedProperty(key, 
NAME).size();
+
+               for (int i = 0; i < columnCount; i++) {
+                       final String keyPrefix = key + "." + i + ".";
+                       properties.validateString(keyPrefix + NAME, false, 1);
+                       properties.validateLong(keyPrefix + DISTINCT_COUNT, 
true, 0L);
+                       properties.validateLong(keyPrefix + NULL_COUNT, true, 
0L);
+                       properties.validateDouble(keyPrefix + AVG_LENGTH, true, 
0.0);
+                       properties.validateInt(keyPrefix + MAX_LENGTH, true, 0);
+                       properties.validateDouble(keyPrefix + MIN_VALUE, true);
+                       Optional<Double> min = 
properties.getOptionalDouble(keyPrefix + MIN_VALUE);
+                       if (min.isPresent()) {
+                               properties.validateDouble(keyPrefix + 
MAX_VALUE, true, min.get());
+                       } else {
+                               properties.validateDouble(keyPrefix + 
MAX_VALUE, true);
+                       }
+               }
+       }
+
+       public static Map<String, ColumnStats> 
readColumnStats(DescriptorProperties properties, String key) {
+
+               // filter for number of columns
+               int columnCount = properties.getIndexedProperty(key, 
NAME).size();
+
+               Map<String, ColumnStats> stats = new HashMap<>();
+               for (int i = 0; i < columnCount; i++) {
+                       final String keyPrefix = key + "." + i + ".";
+                       final String propertyKey = keyPrefix + NAME;
+                       String name = properties.getString(propertyKey);
+
+                       ColumnStats columnStats = new ColumnStats(
+                               properties.getOptionalLong(keyPrefix + 
DISTINCT_COUNT).orElse(null),
+                               properties.getOptionalLong(keyPrefix + 
NULL_COUNT).orElse(null),
+                               properties.getOptionalDouble(keyPrefix + 
AVG_LENGTH).orElse(null),
+                               properties.getOptionalInt(keyPrefix + 
MAX_LENGTH).orElse(null),
+                               properties.getOptionalDouble(keyPrefix + 
MAX_VALUE).orElse(null),
+                               properties.getOptionalDouble(keyPrefix + 
MIN_VALUE).orElse(null)
+                       );
+
+                       stats.put(name, columnStats);
+               }
+
+               return stats;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.java
new file mode 100644
index 0000000..53063ed
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Validator for {@code StreamTableDescriptor}.
+ */
+@Internal
+public class StreamTableDescriptorValidator implements DescriptorValidator {
+
+       public static final String UPDATE_MODE = "update-mode";
+       public static final String UPDATE_MODE_VALUE_APPEND = "append";
+       public static final String UPDATE_MODE_VALUE_RETRACT = "retract";
+       public static final String UPDATE_MODE_VALUE_UPSERT = "upsert";
+
+       private final boolean supportsAppend;
+       private final boolean supportsRetract;
+       private final boolean supportsUpsert;
+
+       public StreamTableDescriptorValidator(boolean supportsAppend, boolean 
supportsRetract, boolean supportsUpsert) {
+               this.supportsAppend = supportsAppend;
+               this.supportsRetract = supportsRetract;
+               this.supportsUpsert = supportsUpsert;
+       }
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               List<String> modeList = new ArrayList<>();
+               if (supportsAppend) {
+                       modeList.add(UPDATE_MODE_VALUE_APPEND);
+               }
+               if (supportsRetract) {
+                       modeList.add(UPDATE_MODE_VALUE_RETRACT);
+               }
+               if (supportsUpsert) {
+                       modeList.add(UPDATE_MODE_VALUE_UPSERT);
+               }
+               properties.validateEnumValues(
+                       UPDATE_MODE,
+                       false,
+                       modeList
+               );
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java
new file mode 100644
index 0000000..0430e8c
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A trait for descriptors that allow to convert between a dynamic table and 
an external connector.
+ */
+@PublicEvolving
+public interface StreamableDescriptor<D extends StreamableDescriptor<D>> 
extends Descriptor {
+
+       /**
+        * Declares how to perform the conversion between a dynamic table and 
an external connector.
+        *
+        * <p>In append mode, a dynamic table and an external connector only 
exchange INSERT messages.
+        *
+        * @see #inRetractMode()
+        * @see #inUpsertMode()
+        */
+       D inAppendMode();
+
+       /**
+        * Declares how to perform the conversion between a dynamic table and 
an external connector.
+        *
+        * <p>In retract mode, a dynamic table and an external connector 
exchange ADD and RETRACT messages.
+        *
+        * <p>An INSERT change is encoded as an ADD message, a DELETE change as 
a RETRACT message, and an
+        * UPDATE change as a RETRACT message for the updated (previous) row 
and an ADD message for
+        * the updating (new) row.
+        *
+        * <p>In this mode, a key must not be defined as opposed to upsert 
mode. However, every update
+        * consists of two messages which is less efficient.
+        *
+        * @see #inAppendMode()
+        * @see #inUpsertMode()
+        */
+       D inRetractMode();
+
+       /**
+        * Declares how to perform the conversion between a dynamic table and 
an external connector.
+        *
+        * <p>In upsert mode, a dynamic table and an external connector 
exchange UPSERT and DELETE messages.
+        *
+        * <p>This mode requires a (possibly composite) unique key by which 
updates can be propagated. The
+        * external connector needs to be aware of the unique key attribute in 
order to apply messages
+        * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
+        * DELETE messages.
+        *
+        * <p>The main difference to a retract stream is that UPDATE changes 
are encoded with a single
+        * message and are therefore more efficient.
+        *
+        * @see #inAppendMode()
+        * @see #inRetractMode()
+        */
+       D inUpsertMode();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java
new file mode 100644
index 0000000..7426629
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Column statistics.
+ */
+@PublicEvolving
+public final class ColumnStats {
+
+       /**
+        * number of distinct values.
+        */
+       private final Long ndv;
+
+       /**
+        * number of nulls.
+        */
+       private final Long nullCount;
+
+       /**
+        * average length of column values.
+        */
+       private final Double avgLen;
+
+       /**
+        * max length of column values.
+        */
+       private final Integer maxLen;
+
+       /**
+        * max value of column values.
+        */
+       private final Number max;
+
+       /**
+        * min value of column values.
+        */
+       private final Number min;
+
+       public ColumnStats(
+               Long ndv,
+               Long nullCount,
+               Double avgLen,
+               Integer maxLen,
+               Number max,
+               Number min) {
+               this.ndv = ndv;
+               this.nullCount = nullCount;
+               this.avgLen = avgLen;
+               this.maxLen = maxLen;
+               this.max = max;
+               this.min = min;
+       }
+
+       public Long getNdv() {
+               return ndv;
+       }
+
+       public Long getNullCount() {
+               return nullCount;
+       }
+
+       public Double getAvgLen() {
+               return avgLen;
+       }
+
+       public Integer getMaxLen() {
+               return maxLen;
+       }
+
+       public Number getMaxValue() {
+               return max;
+       }
+
+       public Number getMinValue() {
+               return min;
+       }
+
+       public String toString() {
+               List<String> columnStats = new ArrayList<>();
+               if (ndv != null) {
+                       columnStats.add("ndv=" + ndv);
+               }
+               if (nullCount != null) {
+                       columnStats.add("nullCount=" + nullCount);
+               }
+               if (avgLen != null) {
+                       columnStats.add("avgLen=" + avgLen);
+               }
+               if (maxLen != null) {
+                       columnStats.add("maxLen=" + maxLen);
+               }
+               if (max != null) {
+                       columnStats.add("max=" + max);
+               }
+               if (min != null) {
+                       columnStats.add("min=" + min);
+               }
+               String columnStatsStr = String.join(", ", columnStats);
+               return "ColumnStats(" + columnStatsStr + ")";
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/TableStats.scala
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
similarity index 52%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/TableStats.scala
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
index 64eee95..f58e2d0 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/TableStats.scala
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
@@ -16,15 +16,43 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.stats
+package org.apache.flink.table.plan.stats;
 
-import java.lang.Long
-import java.util.{Map, HashMap}
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
-  * Table statistics
-  *
-  * @param rowCount cardinality of table
-  * @param colStats statistics of table columns
-  */
-case class TableStats(rowCount: Long, colStats: Map[String, ColumnStats] = new 
HashMap())
+ * Table statistics.
+ */
+@PublicEvolving
+public final class TableStats {
+
+       /**
+        * cardinality of table.
+        */
+       private final long rowCount;
+
+       /**
+        * colStats statistics of table columns.
+        */
+       private final Map<String, ColumnStats> colStats;
+
+       public TableStats(long rowCount) {
+               this(rowCount, new HashMap<>());
+       }
+
+       public TableStats(long rowCount, Map<String, ColumnStats> colStats) {
+               this.rowCount = rowCount;
+               this.colStats = colStats;
+       }
+
+       public long getRowCount() {
+               return rowCount;
+       }
+
+       public Map<String, ColumnStats> getColumnStats() {
+               return colStats;
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
new file mode 100644
index 0000000..4859fc5
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link DescriptorProperties}.
+ */
+public class DescriptorPropertiesTest {
+
+       private static final String ARRAY_KEY = "my-array";
+       private static final String FIXED_INDEXED_PROPERTY_KEY = 
"my-fixed-indexed-property";
+       private static final String PROPERTY_1_KEY = "property-1";
+       private static final String PROPERTY_2_KEY = "property-2";
+
+       @Test
+       public void testEquals() {
+               DescriptorProperties properties1 = new DescriptorProperties();
+               properties1.putString("hello1", "12");
+               properties1.putString("hello2", "13");
+               properties1.putString("hello3", "14");
+
+               DescriptorProperties properties2 = new DescriptorProperties();
+               properties2.putString("hello1", "12");
+               properties2.putString("hello2", "13");
+               properties2.putString("hello3", "14");
+
+               DescriptorProperties properties3 = new DescriptorProperties();
+               properties3.putString("hello1", "12");
+               properties3.putString("hello3", "14");
+               properties3.putString("hello2", "13");
+
+               assertEquals(properties1, properties2);
+
+               assertEquals(properties1, properties3);
+       }
+
+       @Test
+       public void testMissingArray() {
+               DescriptorProperties properties = new DescriptorProperties();
+
+               testArrayValidation(properties, 0, Integer.MAX_VALUE);
+       }
+
+       @Test
+       public void testArrayValues() {
+               DescriptorProperties properties = new DescriptorProperties();
+
+               properties.putString(ARRAY_KEY + ".0", "12");
+               properties.putString(ARRAY_KEY + ".1", "42");
+               properties.putString(ARRAY_KEY + ".2", "66");
+
+               testArrayValidation(properties, 1, Integer.MAX_VALUE);
+
+               assertEquals(
+                       Arrays.asList(12, 42, 66),
+                       properties.getArray(ARRAY_KEY, properties::getInt));
+       }
+
+       @Test
+       public void testArraySingleValue() {
+               DescriptorProperties properties = new DescriptorProperties();
+               properties.putString(ARRAY_KEY, "12");
+
+               testArrayValidation(properties, 1, Integer.MAX_VALUE);
+
+               assertEquals(
+                       Collections.singletonList(12),
+                       properties.getArray(ARRAY_KEY, properties::getInt));
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testArrayInvalidValues() {
+               DescriptorProperties properties = new DescriptorProperties();
+               properties.putString(ARRAY_KEY + ".0", "12");
+               properties.putString(ARRAY_KEY + ".1", "66");
+               properties.putString(ARRAY_KEY + ".2", "INVALID");
+
+               testArrayValidation(properties, 1, Integer.MAX_VALUE);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testArrayInvalidSingleValue() {
+               DescriptorProperties properties = new DescriptorProperties();
+               properties.putString(ARRAY_KEY, "INVALID");
+
+               testArrayValidation(properties, 1, Integer.MAX_VALUE);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testInvalidMissingArray() {
+               DescriptorProperties properties = new DescriptorProperties();
+
+               testArrayValidation(properties, 1, Integer.MAX_VALUE);
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testInvalidFixedIndexedProperties() {
+               DescriptorProperties property = new DescriptorProperties();
+               List<List<String>> list = new ArrayList<>();
+               list.add(Arrays.asList("1", "string"));
+               list.add(Arrays.asList("INVALID", "string"));
+               property.putIndexedFixedProperties(
+                       FIXED_INDEXED_PROPERTY_KEY,
+                       Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY),
+                       list);
+               testFixedIndexedPropertiesValidation(property);
+       }
+
+       @Test
+       public void testRemoveKeys() {
+               DescriptorProperties properties = new DescriptorProperties();
+               properties.putString("hello1", "12");
+               properties.putString("hello2", "13");
+               properties.putString("hello3", "14");
+
+               DescriptorProperties actual = 
properties.withoutKeys(Arrays.asList("hello1", "hello3"));
+
+               DescriptorProperties expected = new DescriptorProperties();
+               expected.putString("hello2", "13");
+
+               assertEquals(expected, actual);
+       }
+
+       @Test
+       public void testPrefixedMap() {
+               DescriptorProperties properties = new DescriptorProperties();
+               properties.putString("hello1", "12");
+               properties.putString("hello2", "13");
+               properties.putString("hello3", "14");
+
+               Map<String, String> actual = 
properties.asPrefixedMap("prefix.");
+
+               DescriptorProperties expected = new DescriptorProperties();
+               expected.putString("prefix.hello1", "12");
+               expected.putString("prefix.hello2", "13");
+               expected.putString("prefix.hello3", "14");
+
+               assertEquals(expected.asMap(), actual);
+       }
+
+       private void testArrayValidation(
+               DescriptorProperties properties,
+               int minLength,
+               int maxLength) {
+               Consumer<String> validator = key -> properties.validateInt(key, 
false);
+
+               properties.validateArray(
+                       ARRAY_KEY,
+                       validator,
+                       minLength,
+                       maxLength);
+       }
+
+       private void testFixedIndexedPropertiesValidation(DescriptorProperties 
properties) {
+
+               Map<String, Consumer<String>> validatorMap = new HashMap<>();
+
+               // PROPERTY_1 should be Int
+               Consumer<String> validator1 = key -> 
properties.validateInt(key, false);
+               validatorMap.put(PROPERTY_1_KEY, validator1);
+               // PROPERTY_2 should be String
+               Consumer<String> validator2 = key -> 
properties.validateString(key, false);
+               validatorMap.put(PROPERTY_2_KEY, validator2);
+
+               properties.validateFixedIndexedProperties(
+                       FIXED_INDEXED_PROPERTY_KEY,
+                       false,
+                       validatorMap
+               );
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/MetadataTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/MetadataTest.java
new file mode 100644
index 0000000..f3674c5
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/MetadataTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for {@link Metadata}.
+ */
+public class MetadataTest extends DescriptorTestBase {
+
+       @Test(expected = ValidationException.class)
+       public void testInvalidCreationTime() {
+               addPropertyAndVerify(descriptors().get(0), 
"metadata.creation-time", "dfghj");
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+
+       @Override
+       public List<Descriptor> descriptors() {
+               Metadata desc = new Metadata()
+                       .comment("Some additional comment")
+                       .creationTime(123L)
+                       .lastAccessTime(12020202L);
+
+               return Arrays.asList(desc);
+       }
+
+       @Override
+       public DescriptorValidator validator() {
+               return new MetadataValidator();
+       }
+
+       @Override
+       public List<Map<String, String>> properties() {
+               Map props = new HashMap<String, String>() {
+                       {
+                               put("metadata.comment", "Some additional 
comment");
+                               put("metadata.creation-time", "123");
+                               put("metadata.last-access-time", "12020202");
+                       }
+               };
+
+               return Arrays.asList(props);
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/StatisticsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/StatisticsTest.java
new file mode 100644
index 0000000..6c5c04f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/StatisticsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.plan.stats.ColumnStats;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for {@link Statistics}.
+ */
+public class StatisticsTest extends DescriptorTestBase {
+
+       @Test(expected = ValidationException.class)
+       public void testInvalidRowCount() {
+               addPropertyAndVerify(descriptors().get(0), 
"statistics.row-count", "abx");
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testMissingName() {
+               removePropertyAndVerify(descriptors().get(0), 
"statistics.columns.0.name");
+       }
+
+       // 
----------------------------------------------------------------------------------------------
+
+       @Override
+       public List<Descriptor> descriptors() {
+               Statistics desc1 = new Statistics()
+                       .rowCount(1000L)
+                       .columnStats("a", new ColumnStats(1L, 2L, 3.0, 4, 6, 5))
+                       .columnAvgLength("b", 42.0)
+                       .columnNullCount("a", 300L);
+
+               Map<String, ColumnStats> map = new HashMap<String, 
ColumnStats>();
+               map.put("a", new ColumnStats(null, 2L, 3.0, null, 6, 5));
+               Statistics desc2 = new Statistics()
+                       .tableStats(new TableStats(32L, map));
+
+               return Arrays.asList(desc1, desc2);
+       }
+
+       @Override
+       public DescriptorValidator validator() {
+               return new StatisticsValidator();
+       }
+
+       @Override
+       public List<Map<String, String>> properties() {
+               Map<String, String> props1 = new HashMap<String, String>() {
+                       {
+                               put("statistics.property-version", "1");
+                               put("statistics.row-count", "1000");
+                               put("statistics.columns.0.name", "a");
+                               put("statistics.columns.0.distinct-count", "1");
+                               put("statistics.columns.0.null-count", "300");
+                               put("statistics.columns.0.avg-length", "3.0");
+                               put("statistics.columns.0.max-length", "4");
+                               put("statistics.columns.0.max-value", "6");
+                               put("statistics.columns.0.min-value", "5");
+                               put("statistics.columns.1.name", "b");
+                               put("statistics.columns.1.avg-length", "42.0");
+                       }
+               };
+
+               Map<String, String> props2 = new HashMap<String, String>() {
+                       {
+                               put("statistics.property-version", "1");
+                               put("statistics.row-count", "32");
+                               put("statistics.columns.0.name", "a");
+                               put("statistics.columns.0.null-count", "2");
+                               put("statistics.columns.0.avg-length", "3.0");
+                               put("statistics.columns.0.max-value", "6");
+                               put("statistics.columns.0.min-value", "5");
+                       }
+               };
+
+               return Arrays.asList(props1, props2);
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index 3aa474c..7b5c343 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -27,8 +27,6 @@ import org.apache.flink.table.factories.TableFactory
 import org.apache.flink.table.plan.stats.TableStats
 import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
 
-import scala.collection.JavaConverters._
-
 /**
   * Defines a table in an [[ExternalCatalog]]. External catalog tables 
describe table sources
   * and/or sinks for both batch and stream environments.
@@ -69,7 +67,7 @@ class ExternalCatalogTable(
     rowCount match {
       case Some(cnt) =>
         val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS)
-        Some(TableStats(cnt, columnStats.asJava))
+        Some(new TableStats(cnt, columnStats))
       case None =>
         None
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
index ec57c5e..6687a5e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.sinks.{BatchTableSink, 
StreamTableSink}
 import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource}
 import org.apache.flink.table.util.Logging
 
-
 /**
   * The utility class is used to convert [[ExternalCatalogTable]] to 
[[TableSourceSinkTable]].
   *
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
deleted file mode 100644
index e5dfb79..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, 
METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
-
-/**
-  * Metadata descriptor for adding additional, useful information.
-  */
-class Metadata extends Descriptor {
-
-  protected var comment: Option[String] = None
-  protected var creationTime: Option[Long] = None
-  protected var lastAccessTime: Option[Long] = None
-
-  /**
-    * Sets a comment.
-    *
-    * @param comment the description
-    */
-  def comment(comment: String): Metadata = {
-    this.comment = Some(comment)
-    this
-  }
-
-  /**
-    * Sets a creation time.
-    *
-    * @param time UTC milliseconds timestamp
-    */
-  def creationTime(time: Long): Metadata = {
-    this.creationTime = Some(time)
-    this
-  }
-
-  /**
-    * Sets a last access time.
-    *
-    * @param time UTC milliseconds timestamp
-    */
-  def lastAccessTime(time: Long): Metadata = {
-    this.lastAccessTime = Some(time)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  final override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-    comment.foreach(c => properties.putString(METADATA_COMMENT, c))
-    creationTime.foreach(t => properties.putLong(METADATA_CREATION_TIME, t))
-    lastAccessTime.foreach(t => properties.putLong(METADATA_LAST_ACCESS_TIME, 
t))
-    properties.asMap()
-  }
-}
-
-/**
-  * Metadata descriptor for adding additional, useful information.
-  */
-object Metadata {
-
-  /**
-    * Metadata descriptor for adding additional, useful information.
-    *
-    * @deprecated Use `new Metadata()`.
-    */
-  @deprecated
-  def apply(): Metadata = new Metadata()
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
deleted file mode 100644
index 9c30872..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, 
METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_PROPERTY_VERSION}
-
-/**
-  * Validator for [[Metadata]].
-  */
-class MetadataValidator extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateInt(METADATA_PROPERTY_VERSION, true, 0, 
Integer.MAX_VALUE)
-    properties.validateString(METADATA_COMMENT, true)
-    properties.validateLong(METADATA_CREATION_TIME, true)
-    properties.validateLong(METADATA_LAST_ACCESS_TIME, true)
-  }
-}
-
-object MetadataValidator {
-
-  val METADATA_PROPERTY_VERSION = "metadata.property-version"
-  val METADATA_COMMENT = "metadata.comment"
-  val METADATA_CREATION_TIME = "metadata.creation-time"
-  val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time"
-
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
deleted file mode 100644
index 1707e1b..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.table.descriptors.StatisticsValidator._
-import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
-  * Statistics descriptor for describing table stats.
-  */
-class Statistics extends Descriptor {
-
-  private var rowCount: Option[Long] = None
-  private val columnStats: mutable.LinkedHashMap[String, mutable.Map[String, 
String]] =
-    mutable.LinkedHashMap[String, mutable.Map[String, String]]()
-
-  /**
-    * Sets the statistics from a [[TableStats]] instance.
-    *
-    * This method overwrites all existing statistics.
-    *
-    * @param tableStats the table statistics
-    */
-  def tableStats(tableStats: TableStats): Statistics = {
-    rowCount(tableStats.rowCount)
-    columnStats.clear()
-    tableStats.colStats.asScala.foreach { case (col, stats) =>
-        columnStats(col, stats)
-    }
-    this
-  }
-
-  /**
-    * Sets statistics for the overall row count. Required.
-    *
-    * @param rowCount the expected number of rows
-    */
-  def rowCount(rowCount: Long): Statistics = {
-    this.rowCount = Some(rowCount)
-    this
-  }
-
-  /**
-    * Sets statistics for a column. Overwrites all existing statistics for 
this column.
-    *
-    * @param columnName the column name
-    * @param columnStats expected statistics for the column
-    */
-  def columnStats(columnName: String, columnStats: ColumnStats): Statistics = {
-    val map = mutable.Map(normalizeColumnStats(columnStats).toSeq: _*)
-    this.columnStats.put(columnName, map)
-    this
-  }
-
-  /**
-    * Sets the number of distinct values statistic for the given column.
-    */
-  def columnDistinctCount(columnName: String, ndv: Long): Statistics = {
-    this.columnStats
-      .getOrElseUpdate(columnName, mutable.HashMap())
-      .put(DISTINCT_COUNT, ndv.toString)
-    this
-  }
-
-  /**
-    * Sets the number of null values statistic for the given column.
-    */
-  def columnNullCount(columnName: String, nullCount: Long): Statistics = {
-    this.columnStats
-      .getOrElseUpdate(columnName, mutable.HashMap())
-      .put(NULL_COUNT, nullCount.toString)
-    this
-  }
-
-  /**
-    * Sets the average length statistic for the given column.
-    */
-  def columnAvgLength(columnName: String, avgLen: Double): Statistics = {
-    this.columnStats
-      .getOrElseUpdate(columnName, mutable.HashMap())
-      .put(AVG_LENGTH, avgLen.toString)
-    this
-  }
-
-  /**
-    * Sets the maximum length statistic for the given column.
-    */
-  def columnMaxLength(columnName: String, maxLen: Integer): Statistics = {
-    this.columnStats
-      .getOrElseUpdate(columnName, mutable.HashMap())
-      .put(MAX_LENGTH, maxLen.toString)
-    this
-  }
-
-  /**
-    * Sets the maximum value statistic for the given column.
-    */
-  def columnMaxValue(columnName: String, max: Number): Statistics = {
-    this.columnStats
-      .getOrElseUpdate(columnName, mutable.HashMap())
-      .put(MAX_VALUE, max.toString)
-    this
-  }
-
-  /**
-    * Sets the minimum value statistic for the given column.
-    */
-  def columnMinValue(columnName: String, min: Number): Statistics = {
-    this.columnStats
-      .getOrElseUpdate(columnName, mutable.HashMap())
-      .put(MIN_VALUE, min.toString)
-    this
-  }
-
-  /**
-    * Converts this descriptor into a set of properties.
-    */
-  final override def toProperties: util.Map[String, String] = {
-    val properties = new DescriptorProperties()
-
-    properties.putInt(STATISTICS_PROPERTY_VERSION, 1)
-    rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc))
-    val namedStats = columnStats.map { case (name, stats) =>
-      // name should not be part of the properties key
-      (stats + (NAME -> name)).toMap.asJava
-    }.toList.asJava
-    properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats)
-
-    properties.asMap()
-  }
-}
-
-/**
-  * Statistics descriptor for describing table stats.
-  */
-object Statistics {
-
-  /**
-    * Statistics descriptor for describing table stats.
-    *
-    * @deprecated Use `new Statistics()`.
-    */
-  @deprecated
-  def apply(): Statistics = new Statistics()
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
deleted file mode 100644
index 518f292..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import org.apache.flink.table.api.ValidationException
-import 
org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, 
STATISTICS_PROPERTY_VERSION, STATISTICS_ROW_COUNT, validateColumnStats}
-import org.apache.flink.table.plan.stats.ColumnStats
-import org.apache.flink.table.util.JavaScalaConversionUtil.toScala
-
-import scala.collection.mutable
-
-/**
-  * Validator for [[FormatDescriptor]].
-  */
-class StatisticsValidator extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    properties.validateInt(STATISTICS_PROPERTY_VERSION, true, 0, 
Integer.MAX_VALUE)
-    properties.validateLong(STATISTICS_ROW_COUNT, true, 0)
-    validateColumnStats(properties, STATISTICS_COLUMNS)
-  }
-}
-
-object StatisticsValidator {
-
-  val STATISTICS_PROPERTY_VERSION = "statistics.property-version"
-  val STATISTICS_ROW_COUNT = "statistics.row-count"
-  val STATISTICS_COLUMNS = "statistics.columns"
-
-  // per column properties
-
-  val NAME = "name"
-  val DISTINCT_COUNT = "distinct-count"
-  val NULL_COUNT = "null-count"
-  val AVG_LENGTH = "avg-length"
-  val MAX_LENGTH = "max-length"
-  val MAX_VALUE = "max-value"
-  val MIN_VALUE = "min-value"
-
-  // utilities
-
-  def normalizeColumnStats(columnStats: ColumnStats): Map[String, String] = {
-    val stats = mutable.HashMap[String, String]()
-    if (columnStats.ndv != null) {
-      stats += DISTINCT_COUNT -> columnStats.ndv.toString
-    }
-    if (columnStats.nullCount != null) {
-      stats += NULL_COUNT -> columnStats.nullCount.toString
-    }
-    if (columnStats.avgLen != null) {
-      stats += AVG_LENGTH -> columnStats.avgLen.toString
-    }
-    if (columnStats.maxLen != null) {
-      stats += MAX_LENGTH -> columnStats.maxLen.toString
-    }
-    if (columnStats.max != null) {
-      stats += MAX_VALUE -> columnStats.max.toString
-    }
-    if (columnStats.min != null) {
-      stats += MIN_VALUE -> columnStats.min.toString
-    }
-    stats.toMap
-  }
-
-  def validateColumnStats(properties: DescriptorProperties, key: String): Unit 
= {
-
-    // filter for number of columns
-    val columnCount = properties.getIndexedProperty(key, NAME).size
-
-    for (i <- 0 until columnCount) {
-      properties.validateString(s"$key.$i.$NAME", false, 1)
-      properties.validateLong(s"$key.$i.$DISTINCT_COUNT", true, 0L)
-      properties.validateLong(s"$key.$i.$NULL_COUNT", true, 0L)
-      properties.validateDouble(s"$key.$i.$AVG_LENGTH", true, 0.0)
-      properties.validateInt(s"$key.$i.$MAX_LENGTH", true, 0)
-      properties.validateDouble(s"$key.$i.$MAX_VALUE", true, 0.0)
-      properties.validateDouble(s"$key.$i.$MIN_VALUE", true, 0.0)
-    }
-  }
-
-  def readColumnStats(properties: DescriptorProperties, key: String): 
Map[String, ColumnStats] = {
-
-    // filter for number of columns
-    val columnCount = properties.getIndexedProperty(key, NAME).size
-
-    val stats = for (i <- 0 until columnCount) yield {
-      val name = 
toScala(properties.getOptionalString(s"$key.$i.$NAME")).getOrElse(
-        throw new ValidationException(s"Could not find name of property 
'$key.$i.$NAME'."))
-
-      val stats = ColumnStats(
-        properties.getOptionalLong(s"$key.$i.$DISTINCT_COUNT").orElse(null),
-        properties.getOptionalLong(s"$key.$i.$NULL_COUNT").orElse(null),
-        properties.getOptionalDouble(s"$key.$i.$AVG_LENGTH").orElse(null),
-        properties.getOptionalInt(s"$key.$i.$MAX_LENGTH").orElse(null),
-        properties.getOptionalDouble(s"$key.$i.$MAX_VALUE").orElse(null),
-        properties.getOptionalDouble(s"$key.$i.$MIN_VALUE").orElse(null)
-      )
-
-      name -> stats
-    }
-
-    stats.toMap
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
deleted file mode 100644
index 7412e5b..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._
-
-/**
-  * Validator for [[StreamTableDescriptor]].
-  */
-class StreamTableDescriptorValidator(
-    supportsAppend: Boolean,
-    supportsRetract: Boolean,
-    supportsUpsert: Boolean)
-  extends DescriptorValidator {
-
-  override def validate(properties: DescriptorProperties): Unit = {
-    val modeList = new util.ArrayList[String]()
-    if (supportsAppend) {
-      modeList.add(UPDATE_MODE_VALUE_APPEND)
-    }
-    if (supportsRetract) {
-      modeList.add(UPDATE_MODE_VALUE_RETRACT)
-    }
-    if (supportsUpsert) {
-      modeList.add(UPDATE_MODE_VALUE_UPSERT)
-    }
-    properties.validateEnumValues(
-      UPDATE_MODE,
-      false,
-      modeList
-    )
-  }
-}
-
-object StreamTableDescriptorValidator {
-
-  val UPDATE_MODE = "update-mode"
-  val UPDATE_MODE_VALUE_APPEND = "append"
-  val UPDATE_MODE_VALUE_RETRACT = "retract"
-  val UPDATE_MODE_VALUE_UPSERT = "upsert"
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
deleted file mode 100644
index 0d424bd..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-/**
-  * A trait for descriptors that allow to convert between a dynamic table and 
an external connector.
-  */
-trait StreamableDescriptor[D <: StreamableDescriptor[D]] extends 
TableDescriptor {
-
-  /**
-    * Declares how to perform the conversion between a dynamic table and an 
external connector.
-    *
-    * In append mode, a dynamic table and an external connector only exchange 
INSERT messages.
-    *
-    * @see See also [[inRetractMode()]] and [[inUpsertMode()]].
-    */
-  def inAppendMode(): D
-
-  /**
-    * Declares how to perform the conversion between a dynamic table and an 
external connector.
-    *
-    * In retract mode, a dynamic table and an external connector exchange ADD 
and RETRACT messages.
-    *
-    * An INSERT change is encoded as an ADD message, a DELETE change as a 
RETRACT message, and an
-    * UPDATE change as a RETRACT message for the updated (previous) row and an 
ADD message for
-    * the updating (new) row.
-    *
-    * In this mode, a key must not be defined as opposed to upsert mode. 
However, every update
-    * consists of two messages which is less efficient.
-    *
-    * @see See also [[inAppendMode()]] and [[inUpsertMode()]].
-    */
-  def inRetractMode(): D
-
-  /**
-    * Declares how to perform the conversion between a dynamic table and an 
external connector.
-    *
-    * In upsert mode, a dynamic table and an external connector exchange 
UPSERT and DELETE messages.
-    *
-    * This mode requires a (possibly composite) unique key by which updates 
can be propagated. The
-    * external connector needs to be aware of the unique key attribute in 
order to apply messages
-    * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. 
DELETE changes as
-    * DELETE messages.
-    *
-    * The main difference to a retract stream is that UPDATE changes are 
encoded with a single
-    * message and are therefore more efficient.
-    *
-    * @see See also [[inAppendMode()]] and [[inRetractMode()]].
-    */
-  def inUpsertMode(): D
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
index c1515b1..4227498 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala
@@ -25,5 +25,5 @@ class DataSetTable[T](
     val dataSet: DataSet[T],
     override val fieldIndexes: Array[Int],
     override val fieldNames: Array[String],
-    override val statistic: FlinkStatistic = 
FlinkStatistic.of(TableStats(1000L)))
+    override val statistic: FlinkStatistic = FlinkStatistic.of(new 
TableStats(1000L)))
   extends InlineTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
deleted file mode 100644
index 8cf06b5..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.stats
-
-import java.lang.{Double, Long}
-
-/**
-  * column statistics
-  *
-  * @param ndv       number of distinct values
-  * @param nullCount number of nulls
-  * @param avgLen    average length of column values
-  * @param maxLen    max length of column values
-  * @param max       max value of column values
-  * @param min       min value of column values
-  */
-case class ColumnStats(
-    ndv: Long,
-    nullCount: Long,
-    avgLen: Double,
-    maxLen: Integer,
-    max: Number,
-    min: Number) {
-
-  override def toString: String = {
-    val columnStatsStr = Seq(
-      if (ndv != null) s"ndv=$ndv" else "",
-      if (nullCount != null) s"nullCount=$nullCount" else "",
-      if (avgLen != null) s"avgLen=$avgLen" else "",
-      if (maxLen != null) s"maxLen=$maxLen" else "",
-      if (max != null) s"max=${max}" else "",
-      if (min != null) s"min=${min}" else ""
-    ).filter(_.nonEmpty).mkString(", ")
-
-    s"ColumnStats(${columnStatsStr})"
-  }
-
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index da08916..5469f94 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -49,7 +49,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends 
Statistic {
     * @return The stats of the specified column.
     */
   def getColumnStats(columnName: String): ColumnStats = tableStats match {
-    case Some(tStats) => tStats.colStats.get(columnName)
+    case Some(tStats) => tStats.getColumnStats.get(columnName)
     case None => null
   }
 
@@ -59,7 +59,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends 
Statistic {
     * @return The number of rows of the table.
     */
   override def getRowCount: Double = tableStats match {
-    case Some(tStats) => tStats.rowCount.toDouble
+    case Some(tStats) => tStats.getRowCount.toDouble
     case None => null
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
deleted file mode 100644
index fe7c75d..0000000
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-import java.util.Collections
-import java.util.function.Consumer
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-/**
-  * Tests for [[DescriptorProperties]].
-  */
-class DescriptorPropertiesTest {
-
-  private val ARRAY_KEY = "my-array"
-  private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property"
-  private val PROPERTY_1_KEY = "property-1"
-  private val PROPERTY_2_KEY = "property-2"
-
-  @Test
-  def testEquals(): Unit = {
-    val properties1 = new DescriptorProperties()
-    properties1.putString("hello1", "12")
-    properties1.putString("hello2", "13")
-    properties1.putString("hello3", "14")
-
-    val properties2 = new DescriptorProperties()
-    properties2.putString("hello1", "12")
-    properties2.putString("hello2", "13")
-    properties2.putString("hello3", "14")
-
-    val properties3 = new DescriptorProperties()
-    properties3.putString("hello1", "12")
-    properties3.putString("hello3", "14")
-    properties3.putString("hello2", "13")
-
-    assertEquals(properties1, properties2)
-
-    assertEquals(properties1, properties3)
-  }
-
-  @Test
-  def testMissingArray(): Unit = {
-    val properties = new DescriptorProperties()
-
-    testArrayValidation(properties, 0, Integer.MAX_VALUE)
-  }
-
-  @Test
-  def testArrayValues(): Unit = {
-    val properties = new DescriptorProperties()
-
-    properties.putString(s"$ARRAY_KEY.0", "12")
-    properties.putString(s"$ARRAY_KEY.1", "42")
-    properties.putString(s"$ARRAY_KEY.2", "66")
-
-    testArrayValidation(properties, 1, Integer.MAX_VALUE)
-
-    assertEquals(
-      util.Arrays.asList(12, 42, 66),
-      properties.getArray(ARRAY_KEY, toJava((key: String) => {
-        properties.getInt(key)
-      })))
-  }
-
-  @Test
-  def testArraySingleValue(): Unit = {
-    val properties = new DescriptorProperties()
-    properties.putString(ARRAY_KEY, "12")
-
-    testArrayValidation(properties, 1, Integer.MAX_VALUE)
-
-    assertEquals(
-      Collections.singletonList(12),
-      properties.getArray(ARRAY_KEY, toJava((key: String) => {
-        properties.getInt(key)
-      })))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testArrayInvalidValues(): Unit = {
-    val properties = new DescriptorProperties()
-    properties.putString(s"$ARRAY_KEY.0", "12")
-    properties.putString(s"$ARRAY_KEY.1", "66")
-    properties.putString(s"$ARRAY_KEY.2", "INVALID")
-
-    testArrayValidation(properties, 1, Integer.MAX_VALUE)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testArrayInvalidSingleValue(): Unit = {
-    val properties = new DescriptorProperties()
-    properties.putString(ARRAY_KEY, "INVALID")
-
-    testArrayValidation(properties, 1, Integer.MAX_VALUE)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidMissingArray(): Unit = {
-    val properties = new DescriptorProperties()
-
-    testArrayValidation(properties, 1, Integer.MAX_VALUE)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidFixedIndexedProperties(): Unit = {
-    val property = new DescriptorProperties()
-    val list = new util.ArrayList[util.List[String]]()
-    list.add(util.Arrays.asList("1", "string"))
-    list.add(util.Arrays.asList("INVALID", "string"))
-    property.putIndexedFixedProperties(
-      FIXED_INDEXED_PROPERTY_KEY,
-      util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY),
-      list)
-    testFixedIndexedPropertiesValidation(property)
-  }
-
-  @Test
-  def testRemoveKeys(): Unit = {
-    val properties = new DescriptorProperties()
-    properties.putString("hello1", "12")
-    properties.putString("hello2", "13")
-    properties.putString("hello3", "14")
-
-    val actual = properties.withoutKeys(util.Arrays.asList("hello1", "hello3"))
-
-    val expected = new DescriptorProperties()
-    expected.putString("hello2", "13")
-
-    assertEquals(expected, actual)
-  }
-
-  @Test
-  def testPrefixedMap(): Unit = {
-    val properties = new DescriptorProperties()
-    properties.putString("hello1", "12")
-    properties.putString("hello2", "13")
-    properties.putString("hello3", "14")
-
-    val actual = properties.asPrefixedMap("prefix.")
-
-    val expected = new DescriptorProperties()
-    expected.putString("prefix.hello1", "12")
-    expected.putString("prefix.hello2", "13")
-    expected.putString("prefix.hello3", "14")
-
-    assertEquals(expected.asMap, actual)
-  }
-
-  private def testArrayValidation(
-      properties: DescriptorProperties,
-      minLength: Int,
-      maxLength: Int)
-    : Unit = {
-    val validator: String => Unit = (key: String) => {
-      properties.validateInt(key, false)
-    }
-
-    properties.validateArray(
-      ARRAY_KEY,
-      toJava(validator),
-      minLength,
-      maxLength)
-  }
-
-  private def testFixedIndexedPropertiesValidation(properties: 
DescriptorProperties): Unit = {
-
-    val validatorMap = new util.HashMap[String, Consumer[String]]()
-
-    // PROPERTY_1 should be Int
-    val validator1: String => Unit = (key: String) => {
-      properties.validateInt(key, false)
-    }
-    validatorMap.put(PROPERTY_1_KEY, toJava(validator1))
-    // PROPERTY_2 should be String
-    val validator2: String => Unit = (key: String) => {
-      properties.validateString(key, false)
-    }
-    validatorMap.put(PROPERTY_2_KEY, toJava(validator2))
-
-    properties.validateFixedIndexedProperties(
-      FIXED_INDEXED_PROPERTY_KEY,
-      false,
-      validatorMap
-    )
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
deleted file mode 100644
index 64965b0..0000000
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import java.util
-
-import org.apache.flink.table.api.ValidationException
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-
-class MetadataTest extends DescriptorTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidCreationTime(): Unit = {
-    addPropertyAndVerify(descriptors().get(0), "metadata.creation-time", 
"dfghj")
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  override def descriptors(): util.List[Descriptor] = {
-    val desc = Metadata()
-      .comment("Some additional comment")
-      .creationTime(123L)
-      .lastAccessTime(12020202L)
-
-    util.Arrays.asList(desc)
-  }
-
-  override def validator(): DescriptorValidator = {
-    new MetadataValidator()
-  }
-
-  override def properties(): util.List[util.Map[String, String]] = {
-    val props = Map(
-      "metadata.comment" -> "Some additional comment",
-      "metadata.creation-time" -> "123",
-      "metadata.last-access-time" -> "12020202"
-    )
-
-    util.Arrays.asList(props.asJava)
-  }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
deleted file mode 100644
index 2def0c3..0000000
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors
-
-import _root_.java.util
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.plan.stats.{ColumnStats, TableStats}
-import org.junit.Test
-
-import scala.collection.JavaConverters._
-
-class StatisticsTest extends DescriptorTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowCount(): Unit = {
-    addPropertyAndVerify(descriptors().get(0), "statistics.row-count", "abx")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMissingName(): Unit = {
-    removePropertyAndVerify(descriptors().get(0), "statistics.columns.0.name")
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  override def descriptors(): util.List[Descriptor] = {
-    val desc1 = Statistics()
-      .rowCount(1000L)
-      .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6))
-      .columnAvgLength("b", 42.0)
-      .columnNullCount("a", 300)
-
-    val map = new util.HashMap[String, ColumnStats]()
-    map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6))
-    val desc2 = Statistics()
-      .tableStats(TableStats(32L, map))
-
-    util.Arrays.asList(desc1, desc2)
-  }
-
-  override def validator(): DescriptorValidator = {
-    new StatisticsValidator()
-  }
-
-  override def properties(): util.List[util.Map[String, String]] = {
-    val props1 = Map(
-      "statistics.property-version" -> "1",
-      "statistics.row-count" -> "1000",
-      "statistics.columns.0.name" -> "a",
-      "statistics.columns.0.distinct-count" -> "1",
-      "statistics.columns.0.null-count" -> "300",
-      "statistics.columns.0.avg-length" -> "3.0",
-      "statistics.columns.0.max-length" -> "4",
-      "statistics.columns.0.max-value" -> "5",
-      "statistics.columns.0.min-value" -> "6",
-      "statistics.columns.1.name" -> "b",
-      "statistics.columns.1.avg-length" -> "42.0"
-    )
-
-    val props2 = Map(
-      "statistics.property-version" -> "1",
-      "statistics.row-count" -> "32",
-      "statistics.columns.0.name" -> "a",
-      "statistics.columns.0.null-count" -> "2",
-      "statistics.columns.0.avg-length" -> "3.0",
-      "statistics.columns.0.max-value" -> "5",
-      "statistics.columns.0.min-value" -> "6"
-    )
-
-    util.Arrays.asList(props1.asJava, props2.asJava)
-  }
-}

Reply via email to