This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 012e611 [FLINK-11516][table-common] Port and move catalog transitive classes to flink-table-common 012e611 is described below commit 012e6118ad3de2bfaadd46c76f25bf9d4ad9a9d1 Author: Dian Fu <fudian...@alibaba-inc.com> AuthorDate: Fri Feb 1 23:23:06 2019 +0800 [FLINK-11516][table-common] Port and move catalog transitive classes to flink-table-common This closes #7642. --- .../ElasticsearchUpsertTableSinkFactoryBase.java | 4 +- .../kafka/KafkaTableSourceSinkFactoryBase.java | 2 +- .../gateway/utils/TestTableSinkFactoryBase.java | 2 +- .../gateway/utils/TestTableSourceFactoryBase.java | 2 +- .../apache/flink/table/descriptors/Metadata.java | 78 ++++++++ .../flink/table/descriptors/MetadataValidator.java | 41 ++++ .../apache/flink/table/descriptors/Statistics.java | 168 +++++++++++++++++ .../table/descriptors/StatisticsValidator.java | 126 +++++++++++++ .../StreamTableDescriptorValidator.java | 65 +++++++ .../table/descriptors/StreamableDescriptor.java | 73 ++++++++ .../apache/flink/table/plan/stats/ColumnStats.java | 124 ++++++++++++ .../apache/flink/table/plan/stats/TableStats.java} | 46 ++++- .../descriptors/DescriptorPropertiesTest.java | 201 ++++++++++++++++++++ .../flink/table/descriptors/MetadataTest.java | 69 +++++++ .../flink/table/descriptors/StatisticsTest.java | 102 ++++++++++ .../flink/table/catalog/ExternalCatalogTable.scala | 4 +- .../flink/table/catalog/ExternalTableUtil.scala | 1 - .../apache/flink/table/descriptors/Metadata.scala | 88 --------- .../table/descriptors/MetadataValidator.scala | 43 ----- .../flink/table/descriptors/Statistics.scala | 166 ----------------- .../table/descriptors/StatisticsValidator.scala | 120 ------------ .../StreamTableDescriptorValidator.scala | 59 ------ .../table/descriptors/StreamableDescriptor.scala | 67 ------- .../flink/table/plan/schema/DataSetTable.scala | 2 +- .../flink/table/plan/stats/ColumnStats.scala | 54 ------ .../flink/table/plan/stats/FlinkStatistic.scala | 4 +- .../descriptors/DescriptorPropertiesTest.scala | 207 --------------------- .../flink/table/descriptors/MetadataTest.scala | 59 ------ .../flink/table/descriptors/StatisticsTest.scala | 89 --------- 29 files changed, 1093 insertions(+), 973 deletions(-) diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java index 63e9b34..f52de79 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java @@ -113,7 +113,7 @@ public abstract class ElasticsearchUpsertTableSinkFactoryBase implements StreamT final List<String> properties = new ArrayList<>(); // streaming properties - properties.add(UPDATE_MODE()); + properties.add(UPDATE_MODE); // Elasticsearch properties.add(CONNECTOR_HOSTS + ".#." + CONNECTOR_HOSTS_HOSTNAME); @@ -150,7 +150,7 @@ public abstract class ElasticsearchUpsertTableSinkFactoryBase implements StreamT final DescriptorProperties descriptorProperties = getValidatedProperties(properties); return createElasticsearchUpsertTableSink( - descriptorProperties.isValue(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()), + descriptorProperties.isValue(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND), descriptorProperties.getTableSchema(SCHEMA()), getHosts(descriptorProperties), descriptorProperties.getString(CONNECTOR_INDEX), diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java index 542dbd9..146eba5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java @@ -92,7 +92,7 @@ public abstract class KafkaTableSourceSinkFactoryBase implements @Override public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); - context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); // append mode + context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); // append mode context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_KAFKA); // kafka context.put(CONNECTOR_VERSION, kafkaVersion()); // version context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java index d7d4f9f..c3c4152 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java @@ -61,7 +61,7 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory @Override public Map<String, String> requiredContext() { final Map<String, String> context = new HashMap<>(); - context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); + context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); context.put(CONNECTOR_TYPE, type); return context; } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java index 59091d8..12eee4f 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java @@ -64,7 +64,7 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac @Override public Map<String, String> requiredContext() { final Map<String, String> context = new HashMap<>(); - context.put(UPDATE_MODE(), UPDATE_MODE_VALUE_APPEND()); + context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); context.put(CONNECTOR_TYPE, type); return context; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Metadata.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Metadata.java new file mode 100644 index 0000000..5e74bec --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Metadata.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +import static org.apache.flink.table.descriptors.MetadataValidator.METADATA_COMMENT; +import static org.apache.flink.table.descriptors.MetadataValidator.METADATA_CREATION_TIME; +import static org.apache.flink.table.descriptors.MetadataValidator.METADATA_LAST_ACCESS_TIME; + +/** + * Metadata descriptor for adding additional, useful information. + */ +@PublicEvolving +public class Metadata implements Descriptor { + + private final DescriptorProperties internalProperties = new DescriptorProperties(true); + + public Metadata() {} + + /** + * Sets a comment. + * + * @param comment the description + */ + public Metadata comment(String comment) { + internalProperties.putString(METADATA_COMMENT, comment); + return this; + } + + /** + * Sets a creation time. + * + * @param time UTC milliseconds timestamp + */ + public Metadata creationTime(long time) { + internalProperties.putLong(METADATA_CREATION_TIME, time); + return this; + } + + /** + * Sets a last access time. + * + * @param time UTC milliseconds timestamp + */ + public Metadata lastAccessTime(long time) { + internalProperties.putLong(METADATA_LAST_ACCESS_TIME, time); + return this; + } + + /** + * Converts this descriptor into a set of properties. + */ + @Override + public final Map<String, String> toProperties() { + final DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(internalProperties); + return properties.asMap(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/MetadataValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/MetadataValidator.java new file mode 100644 index 0000000..e09c8ec --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/MetadataValidator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; + +/** + * Validator for {@link Metadata}. + */ +@Internal +public class MetadataValidator implements DescriptorValidator { + + public static final String METADATA_PROPERTY_VERSION = "metadata.property-version"; + public static final String METADATA_COMMENT = "metadata.comment"; + public static final String METADATA_CREATION_TIME = "metadata.creation-time"; + public static final String METADATA_LAST_ACCESS_TIME = "metadata.last-access-time"; + + @Override + public void validate(DescriptorProperties properties) { + properties.validateInt(METADATA_PROPERTY_VERSION, true, 0, Integer.MAX_VALUE); + properties.validateString(METADATA_COMMENT, true); + properties.validateLong(METADATA_CREATION_TIME, true); + properties.validateLong(METADATA_LAST_ACCESS_TIME, true); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Statistics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Statistics.java new file mode 100644 index 0000000..f8d1c86 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/Statistics.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.plan.stats.ColumnStats; +import org.apache.flink.table.plan.stats.TableStats; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.descriptors.StatisticsValidator.AVG_LENGTH; +import static org.apache.flink.table.descriptors.StatisticsValidator.DISTINCT_COUNT; +import static org.apache.flink.table.descriptors.StatisticsValidator.MAX_LENGTH; +import static org.apache.flink.table.descriptors.StatisticsValidator.MAX_VALUE; +import static org.apache.flink.table.descriptors.StatisticsValidator.MIN_VALUE; +import static org.apache.flink.table.descriptors.StatisticsValidator.NAME; +import static org.apache.flink.table.descriptors.StatisticsValidator.NULL_COUNT; +import static org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_COLUMNS; +import static org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.StatisticsValidator.STATISTICS_ROW_COUNT; +import static org.apache.flink.table.descriptors.StatisticsValidator.normalizeColumnStats; + +/** + * Statistics descriptor for describing table stats. + */ +@PublicEvolving +public class Statistics implements Descriptor { + + private final DescriptorProperties internalProperties = new DescriptorProperties(true); + private LinkedHashMap<String, Map<String, String>> columnStats = new LinkedHashMap<>(); + + /** + * Sets the statistics from a {@link TableStats} instance. + * + * <p>This method overwrites all existing statistics. + * + * @param tableStats the table statistics + */ + public Statistics tableStats(TableStats tableStats) { + rowCount(tableStats.getRowCount()); + columnStats.clear(); + tableStats.getColumnStats().forEach(this::columnStats); + return this; + } + + /** + * Sets statistics for the overall row count. Required. + * + * @param rowCount the expected number of rows + */ + public Statistics rowCount(long rowCount) { + internalProperties.putLong(STATISTICS_ROW_COUNT, rowCount); + return this; + } + + /** + * Sets statistics for a column. Overwrites all existing statistics for this column. + * + * @param columnName the column name + * @param columnStats expected statistics for the column + */ + public Statistics columnStats(String columnName, ColumnStats columnStats) { + Map<String, String> map = normalizeColumnStats(columnStats); + this.columnStats.put(columnName, map); + return this; + } + + /** + * Sets the number of distinct values statistic for the given column. + */ + public Statistics columnDistinctCount(String columnName, Long ndv) { + this.columnStats + .computeIfAbsent(columnName, column -> new HashMap<>()) + .put(DISTINCT_COUNT, String.valueOf(ndv)); + return this; + } + + /** + * Sets the number of null values statistic for the given column. + */ + public Statistics columnNullCount(String columnName, Long nullCount) { + this.columnStats + .computeIfAbsent(columnName, column -> new HashMap<>()) + .put(NULL_COUNT, String.valueOf(nullCount)); + return this; + } + + /** + * Sets the average length statistic for the given column. + */ + public Statistics columnAvgLength(String columnName, Double avgLen) { + this.columnStats + .computeIfAbsent(columnName, column -> new HashMap<>()) + .put(AVG_LENGTH, String.valueOf(avgLen)); + return this; + } + + /** + * Sets the maximum length statistic for the given column. + */ + public Statistics columnMaxLength(String columnName, Integer maxLen) { + this.columnStats + .computeIfAbsent(columnName, column -> new HashMap<>()) + .put(MAX_LENGTH, String.valueOf(maxLen)); + return this; + } + + /** + * Sets the maximum value statistic for the given column. + */ + public Statistics columnMaxValue(String columnName, Number max) { + this.columnStats + .computeIfAbsent(columnName, column -> new HashMap<>()) + .put(MAX_VALUE, String.valueOf(max)); + return this; + } + + /** + * Sets the minimum value statistic for the given column. + */ + public Statistics columnMinValue(String columnName, Number min) { + this.columnStats + .computeIfAbsent(columnName, column -> new HashMap<>()) + .put(MIN_VALUE, String.valueOf(min)); + return this; + } + + /** + * Converts this descriptor into a set of properties. + */ + @Override + public final Map<String, String> toProperties() { + final DescriptorProperties properties = new DescriptorProperties(); + properties.putProperties(internalProperties); + + properties.putInt(STATISTICS_PROPERTY_VERSION, 1); + + List<Map<String, String>> namedStats = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : columnStats.entrySet()) { + Map<String, String> columnStat = entry.getValue(); + columnStat.put(NAME, entry.getKey()); + namedStats.add(columnStat); + } + + properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats); + return properties.asMap(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java new file mode 100644 index 0000000..e8797e8 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StatisticsValidator.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.plan.stats.ColumnStats; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Validator for {@link Statistics}. + */ +@Internal +public class StatisticsValidator implements DescriptorValidator { + + public static final String STATISTICS_PROPERTY_VERSION = "statistics.property-version"; + public static final String STATISTICS_ROW_COUNT = "statistics.row-count"; + public static final String STATISTICS_COLUMNS = "statistics.columns"; + + // per column properties + public static final String NAME = "name"; + public static final String DISTINCT_COUNT = "distinct-count"; + public static final String NULL_COUNT = "null-count"; + public static final String AVG_LENGTH = "avg-length"; + public static final String MAX_LENGTH = "max-length"; + public static final String MAX_VALUE = "max-value"; + public static final String MIN_VALUE = "min-value"; + + @Override + public void validate(DescriptorProperties properties) { + properties.validateInt(STATISTICS_PROPERTY_VERSION, true, 0, Integer.MAX_VALUE); + properties.validateLong(STATISTICS_ROW_COUNT, true, 0); + validateColumnStats(properties, STATISTICS_COLUMNS); + } + + // utilities + + public static Map<String, String> normalizeColumnStats(ColumnStats columnStats) { + Map<String, String> stats = new HashMap<>(); + if (columnStats.getNdv() != null) { + stats.put(DISTINCT_COUNT, String.valueOf(columnStats.getNdv())); + } + if (columnStats.getNullCount() != null) { + stats.put(NULL_COUNT, String.valueOf(columnStats.getNullCount())); + } + if (columnStats.getAvgLen() != null) { + stats.put(AVG_LENGTH, String.valueOf(columnStats.getAvgLen())); + } + if (columnStats.getMaxLen() != null) { + stats.put(MAX_LENGTH, String.valueOf(columnStats.getMaxLen())); + } + if (columnStats.getMaxValue() != null) { + stats.put(MAX_VALUE, String.valueOf(columnStats.getMaxValue())); + } + if (columnStats.getMinValue() != null) { + stats.put(MIN_VALUE, String.valueOf(columnStats.getMinValue())); + } + return stats; + } + + public static void validateColumnStats(DescriptorProperties properties, String key) { + + // filter for number of columns + int columnCount = properties.getIndexedProperty(key, NAME).size(); + + for (int i = 0; i < columnCount; i++) { + final String keyPrefix = key + "." + i + "."; + properties.validateString(keyPrefix + NAME, false, 1); + properties.validateLong(keyPrefix + DISTINCT_COUNT, true, 0L); + properties.validateLong(keyPrefix + NULL_COUNT, true, 0L); + properties.validateDouble(keyPrefix + AVG_LENGTH, true, 0.0); + properties.validateInt(keyPrefix + MAX_LENGTH, true, 0); + properties.validateDouble(keyPrefix + MIN_VALUE, true); + Optional<Double> min = properties.getOptionalDouble(keyPrefix + MIN_VALUE); + if (min.isPresent()) { + properties.validateDouble(keyPrefix + MAX_VALUE, true, min.get()); + } else { + properties.validateDouble(keyPrefix + MAX_VALUE, true); + } + } + } + + public static Map<String, ColumnStats> readColumnStats(DescriptorProperties properties, String key) { + + // filter for number of columns + int columnCount = properties.getIndexedProperty(key, NAME).size(); + + Map<String, ColumnStats> stats = new HashMap<>(); + for (int i = 0; i < columnCount; i++) { + final String keyPrefix = key + "." + i + "."; + final String propertyKey = keyPrefix + NAME; + String name = properties.getString(propertyKey); + + ColumnStats columnStats = new ColumnStats( + properties.getOptionalLong(keyPrefix + DISTINCT_COUNT).orElse(null), + properties.getOptionalLong(keyPrefix + NULL_COUNT).orElse(null), + properties.getOptionalDouble(keyPrefix + AVG_LENGTH).orElse(null), + properties.getOptionalInt(keyPrefix + MAX_LENGTH).orElse(null), + properties.getOptionalDouble(keyPrefix + MAX_VALUE).orElse(null), + properties.getOptionalDouble(keyPrefix + MIN_VALUE).orElse(null) + ); + + stats.put(name, columnStats); + } + + return stats; + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.java new file mode 100644 index 0000000..53063ed --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; + +import java.util.ArrayList; +import java.util.List; + +/** + * Validator for {@code StreamTableDescriptor}. + */ +@Internal +public class StreamTableDescriptorValidator implements DescriptorValidator { + + public static final String UPDATE_MODE = "update-mode"; + public static final String UPDATE_MODE_VALUE_APPEND = "append"; + public static final String UPDATE_MODE_VALUE_RETRACT = "retract"; + public static final String UPDATE_MODE_VALUE_UPSERT = "upsert"; + + private final boolean supportsAppend; + private final boolean supportsRetract; + private final boolean supportsUpsert; + + public StreamTableDescriptorValidator(boolean supportsAppend, boolean supportsRetract, boolean supportsUpsert) { + this.supportsAppend = supportsAppend; + this.supportsRetract = supportsRetract; + this.supportsUpsert = supportsUpsert; + } + + @Override + public void validate(DescriptorProperties properties) { + List<String> modeList = new ArrayList<>(); + if (supportsAppend) { + modeList.add(UPDATE_MODE_VALUE_APPEND); + } + if (supportsRetract) { + modeList.add(UPDATE_MODE_VALUE_RETRACT); + } + if (supportsUpsert) { + modeList.add(UPDATE_MODE_VALUE_UPSERT); + } + properties.validateEnumValues( + UPDATE_MODE, + false, + modeList + ); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java new file mode 100644 index 0000000..0430e8c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/StreamableDescriptor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A trait for descriptors that allow to convert between a dynamic table and an external connector. + */ +@PublicEvolving +public interface StreamableDescriptor<D extends StreamableDescriptor<D>> extends Descriptor { + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * <p>In append mode, a dynamic table and an external connector only exchange INSERT messages. + * + * @see #inRetractMode() + * @see #inUpsertMode() + */ + D inAppendMode(); + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * <p>In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. + * + * <p>An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an + * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for + * the updating (new) row. + * + * <p>In this mode, a key must not be defined as opposed to upsert mode. However, every update + * consists of two messages which is less efficient. + * + * @see #inAppendMode() + * @see #inUpsertMode() + */ + D inRetractMode(); + + /** + * Declares how to perform the conversion between a dynamic table and an external connector. + * + * <p>In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. + * + * <p>This mode requires a (possibly composite) unique key by which updates can be propagated. The + * external connector needs to be aware of the unique key attribute in order to apply messages + * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as + * DELETE messages. + * + * <p>The main difference to a retract stream is that UPDATE changes are encoded with a single + * message and are therefore more efficient. + * + * @see #inAppendMode() + * @see #inRetractMode() + */ + D inUpsertMode(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java new file mode 100644 index 0000000..7426629 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.stats; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.ArrayList; +import java.util.List; + +/** + * Column statistics. + */ +@PublicEvolving +public final class ColumnStats { + + /** + * number of distinct values. + */ + private final Long ndv; + + /** + * number of nulls. + */ + private final Long nullCount; + + /** + * average length of column values. + */ + private final Double avgLen; + + /** + * max length of column values. + */ + private final Integer maxLen; + + /** + * max value of column values. + */ + private final Number max; + + /** + * min value of column values. + */ + private final Number min; + + public ColumnStats( + Long ndv, + Long nullCount, + Double avgLen, + Integer maxLen, + Number max, + Number min) { + this.ndv = ndv; + this.nullCount = nullCount; + this.avgLen = avgLen; + this.maxLen = maxLen; + this.max = max; + this.min = min; + } + + public Long getNdv() { + return ndv; + } + + public Long getNullCount() { + return nullCount; + } + + public Double getAvgLen() { + return avgLen; + } + + public Integer getMaxLen() { + return maxLen; + } + + public Number getMaxValue() { + return max; + } + + public Number getMinValue() { + return min; + } + + public String toString() { + List<String> columnStats = new ArrayList<>(); + if (ndv != null) { + columnStats.add("ndv=" + ndv); + } + if (nullCount != null) { + columnStats.add("nullCount=" + nullCount); + } + if (avgLen != null) { + columnStats.add("avgLen=" + avgLen); + } + if (maxLen != null) { + columnStats.add("maxLen=" + maxLen); + } + if (max != null) { + columnStats.add("max=" + max); + } + if (min != null) { + columnStats.add("min=" + min); + } + String columnStatsStr = String.join(", ", columnStats); + return "ColumnStats(" + columnStatsStr + ")"; + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/TableStats.scala b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java similarity index 52% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/TableStats.scala rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java index 64eee95..f58e2d0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/TableStats.scala +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java @@ -16,15 +16,43 @@ * limitations under the License. */ -package org.apache.flink.table.plan.stats +package org.apache.flink.table.plan.stats; -import java.lang.Long -import java.util.{Map, HashMap} +import org.apache.flink.annotation.PublicEvolving; + +import java.util.HashMap; +import java.util.Map; /** - * Table statistics - * - * @param rowCount cardinality of table - * @param colStats statistics of table columns - */ -case class TableStats(rowCount: Long, colStats: Map[String, ColumnStats] = new HashMap()) + * Table statistics. + */ +@PublicEvolving +public final class TableStats { + + /** + * cardinality of table. + */ + private final long rowCount; + + /** + * colStats statistics of table columns. + */ + private final Map<String, ColumnStats> colStats; + + public TableStats(long rowCount) { + this(rowCount, new HashMap<>()); + } + + public TableStats(long rowCount, Map<String, ColumnStats> colStats) { + this.rowCount = rowCount; + this.colStats = colStats; + } + + public long getRowCount() { + return rowCount; + } + + public Map<String, ColumnStats> getColumnStats() { + return colStats; + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java new file mode 100644 index 0000000..4859fc5 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/DescriptorPropertiesTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link DescriptorProperties}. + */ +public class DescriptorPropertiesTest { + + private static final String ARRAY_KEY = "my-array"; + private static final String FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property"; + private static final String PROPERTY_1_KEY = "property-1"; + private static final String PROPERTY_2_KEY = "property-2"; + + @Test + public void testEquals() { + DescriptorProperties properties1 = new DescriptorProperties(); + properties1.putString("hello1", "12"); + properties1.putString("hello2", "13"); + properties1.putString("hello3", "14"); + + DescriptorProperties properties2 = new DescriptorProperties(); + properties2.putString("hello1", "12"); + properties2.putString("hello2", "13"); + properties2.putString("hello3", "14"); + + DescriptorProperties properties3 = new DescriptorProperties(); + properties3.putString("hello1", "12"); + properties3.putString("hello3", "14"); + properties3.putString("hello2", "13"); + + assertEquals(properties1, properties2); + + assertEquals(properties1, properties3); + } + + @Test + public void testMissingArray() { + DescriptorProperties properties = new DescriptorProperties(); + + testArrayValidation(properties, 0, Integer.MAX_VALUE); + } + + @Test + public void testArrayValues() { + DescriptorProperties properties = new DescriptorProperties(); + + properties.putString(ARRAY_KEY + ".0", "12"); + properties.putString(ARRAY_KEY + ".1", "42"); + properties.putString(ARRAY_KEY + ".2", "66"); + + testArrayValidation(properties, 1, Integer.MAX_VALUE); + + assertEquals( + Arrays.asList(12, 42, 66), + properties.getArray(ARRAY_KEY, properties::getInt)); + } + + @Test + public void testArraySingleValue() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString(ARRAY_KEY, "12"); + + testArrayValidation(properties, 1, Integer.MAX_VALUE); + + assertEquals( + Collections.singletonList(12), + properties.getArray(ARRAY_KEY, properties::getInt)); + } + + @Test(expected = ValidationException.class) + public void testArrayInvalidValues() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString(ARRAY_KEY + ".0", "12"); + properties.putString(ARRAY_KEY + ".1", "66"); + properties.putString(ARRAY_KEY + ".2", "INVALID"); + + testArrayValidation(properties, 1, Integer.MAX_VALUE); + } + + @Test(expected = ValidationException.class) + public void testArrayInvalidSingleValue() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString(ARRAY_KEY, "INVALID"); + + testArrayValidation(properties, 1, Integer.MAX_VALUE); + } + + @Test(expected = ValidationException.class) + public void testInvalidMissingArray() { + DescriptorProperties properties = new DescriptorProperties(); + + testArrayValidation(properties, 1, Integer.MAX_VALUE); + } + + @Test(expected = ValidationException.class) + public void testInvalidFixedIndexedProperties() { + DescriptorProperties property = new DescriptorProperties(); + List<List<String>> list = new ArrayList<>(); + list.add(Arrays.asList("1", "string")); + list.add(Arrays.asList("INVALID", "string")); + property.putIndexedFixedProperties( + FIXED_INDEXED_PROPERTY_KEY, + Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY), + list); + testFixedIndexedPropertiesValidation(property); + } + + @Test + public void testRemoveKeys() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString("hello1", "12"); + properties.putString("hello2", "13"); + properties.putString("hello3", "14"); + + DescriptorProperties actual = properties.withoutKeys(Arrays.asList("hello1", "hello3")); + + DescriptorProperties expected = new DescriptorProperties(); + expected.putString("hello2", "13"); + + assertEquals(expected, actual); + } + + @Test + public void testPrefixedMap() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putString("hello1", "12"); + properties.putString("hello2", "13"); + properties.putString("hello3", "14"); + + Map<String, String> actual = properties.asPrefixedMap("prefix."); + + DescriptorProperties expected = new DescriptorProperties(); + expected.putString("prefix.hello1", "12"); + expected.putString("prefix.hello2", "13"); + expected.putString("prefix.hello3", "14"); + + assertEquals(expected.asMap(), actual); + } + + private void testArrayValidation( + DescriptorProperties properties, + int minLength, + int maxLength) { + Consumer<String> validator = key -> properties.validateInt(key, false); + + properties.validateArray( + ARRAY_KEY, + validator, + minLength, + maxLength); + } + + private void testFixedIndexedPropertiesValidation(DescriptorProperties properties) { + + Map<String, Consumer<String>> validatorMap = new HashMap<>(); + + // PROPERTY_1 should be Int + Consumer<String> validator1 = key -> properties.validateInt(key, false); + validatorMap.put(PROPERTY_1_KEY, validator1); + // PROPERTY_2 should be String + Consumer<String> validator2 = key -> properties.validateString(key, false); + validatorMap.put(PROPERTY_2_KEY, validator2); + + properties.validateFixedIndexedProperties( + FIXED_INDEXED_PROPERTY_KEY, + false, + validatorMap + ); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/MetadataTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/MetadataTest.java new file mode 100644 index 0000000..f3674c5 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/MetadataTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for {@link Metadata}. + */ +public class MetadataTest extends DescriptorTestBase { + + @Test(expected = ValidationException.class) + public void testInvalidCreationTime() { + addPropertyAndVerify(descriptors().get(0), "metadata.creation-time", "dfghj"); + } + + // ---------------------------------------------------------------------------------------------- + + @Override + public List<Descriptor> descriptors() { + Metadata desc = new Metadata() + .comment("Some additional comment") + .creationTime(123L) + .lastAccessTime(12020202L); + + return Arrays.asList(desc); + } + + @Override + public DescriptorValidator validator() { + return new MetadataValidator(); + } + + @Override + public List<Map<String, String>> properties() { + Map props = new HashMap<String, String>() { + { + put("metadata.comment", "Some additional comment"); + put("metadata.creation-time", "123"); + put("metadata.last-access-time", "12020202"); + } + }; + + return Arrays.asList(props); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/StatisticsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/StatisticsTest.java new file mode 100644 index 0000000..6c5c04f --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/descriptors/StatisticsTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.plan.stats.ColumnStats; +import org.apache.flink.table.plan.stats.TableStats; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for {@link Statistics}. + */ +public class StatisticsTest extends DescriptorTestBase { + + @Test(expected = ValidationException.class) + public void testInvalidRowCount() { + addPropertyAndVerify(descriptors().get(0), "statistics.row-count", "abx"); + } + + @Test(expected = ValidationException.class) + public void testMissingName() { + removePropertyAndVerify(descriptors().get(0), "statistics.columns.0.name"); + } + + // ---------------------------------------------------------------------------------------------- + + @Override + public List<Descriptor> descriptors() { + Statistics desc1 = new Statistics() + .rowCount(1000L) + .columnStats("a", new ColumnStats(1L, 2L, 3.0, 4, 6, 5)) + .columnAvgLength("b", 42.0) + .columnNullCount("a", 300L); + + Map<String, ColumnStats> map = new HashMap<String, ColumnStats>(); + map.put("a", new ColumnStats(null, 2L, 3.0, null, 6, 5)); + Statistics desc2 = new Statistics() + .tableStats(new TableStats(32L, map)); + + return Arrays.asList(desc1, desc2); + } + + @Override + public DescriptorValidator validator() { + return new StatisticsValidator(); + } + + @Override + public List<Map<String, String>> properties() { + Map<String, String> props1 = new HashMap<String, String>() { + { + put("statistics.property-version", "1"); + put("statistics.row-count", "1000"); + put("statistics.columns.0.name", "a"); + put("statistics.columns.0.distinct-count", "1"); + put("statistics.columns.0.null-count", "300"); + put("statistics.columns.0.avg-length", "3.0"); + put("statistics.columns.0.max-length", "4"); + put("statistics.columns.0.max-value", "6"); + put("statistics.columns.0.min-value", "5"); + put("statistics.columns.1.name", "b"); + put("statistics.columns.1.avg-length", "42.0"); + } + }; + + Map<String, String> props2 = new HashMap<String, String>() { + { + put("statistics.property-version", "1"); + put("statistics.row-count", "32"); + put("statistics.columns.0.name", "a"); + put("statistics.columns.0.null-count", "2"); + put("statistics.columns.0.avg-length", "3.0"); + put("statistics.columns.0.max-value", "6"); + put("statistics.columns.0.min-value", "5"); + } + }; + + return Arrays.asList(props1, props2); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index 3aa474c..7b5c343 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -27,8 +27,6 @@ import org.apache.flink.table.factories.TableFactory import org.apache.flink.table.plan.stats.TableStats import org.apache.flink.table.util.JavaScalaConversionUtil.toScala -import scala.collection.JavaConverters._ - /** * Defines a table in an [[ExternalCatalog]]. External catalog tables describe table sources * and/or sinks for both batch and stream environments. @@ -69,7 +67,7 @@ class ExternalCatalogTable( rowCount match { case Some(cnt) => val columnStats = readColumnStats(normalizedProps, STATISTICS_COLUMNS) - Some(TableStats(cnt, columnStats.asJava)) + Some(new TableStats(cnt, columnStats)) case None => None } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala index ec57c5e..6687a5e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala @@ -26,7 +26,6 @@ import org.apache.flink.table.sinks.{BatchTableSink, StreamTableSink} import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource} import org.apache.flink.table.util.Logging - /** * The utility class is used to convert [[ExternalCatalogTable]] to [[TableSourceSinkTable]]. * diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala deleted file mode 100644 index e5dfb79..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Metadata.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util - -import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME} - -/** - * Metadata descriptor for adding additional, useful information. - */ -class Metadata extends Descriptor { - - protected var comment: Option[String] = None - protected var creationTime: Option[Long] = None - protected var lastAccessTime: Option[Long] = None - - /** - * Sets a comment. - * - * @param comment the description - */ - def comment(comment: String): Metadata = { - this.comment = Some(comment) - this - } - - /** - * Sets a creation time. - * - * @param time UTC milliseconds timestamp - */ - def creationTime(time: Long): Metadata = { - this.creationTime = Some(time) - this - } - - /** - * Sets a last access time. - * - * @param time UTC milliseconds timestamp - */ - def lastAccessTime(time: Long): Metadata = { - this.lastAccessTime = Some(time) - this - } - - /** - * Converts this descriptor into a set of properties. - */ - final override def toProperties: util.Map[String, String] = { - val properties = new DescriptorProperties() - comment.foreach(c => properties.putString(METADATA_COMMENT, c)) - creationTime.foreach(t => properties.putLong(METADATA_CREATION_TIME, t)) - lastAccessTime.foreach(t => properties.putLong(METADATA_LAST_ACCESS_TIME, t)) - properties.asMap() - } -} - -/** - * Metadata descriptor for adding additional, useful information. - */ -object Metadata { - - /** - * Metadata descriptor for adding additional, useful information. - * - * @deprecated Use `new Metadata()`. - */ - @deprecated - def apply(): Metadata = new Metadata() -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala deleted file mode 100644 index 9c30872..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/MetadataValidator.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME, METADATA_PROPERTY_VERSION} - -/** - * Validator for [[Metadata]]. - */ -class MetadataValidator extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - properties.validateInt(METADATA_PROPERTY_VERSION, true, 0, Integer.MAX_VALUE) - properties.validateString(METADATA_COMMENT, true) - properties.validateLong(METADATA_CREATION_TIME, true) - properties.validateLong(METADATA_LAST_ACCESS_TIME, true) - } -} - -object MetadataValidator { - - val METADATA_PROPERTY_VERSION = "metadata.property-version" - val METADATA_COMMENT = "metadata.comment" - val METADATA_CREATION_TIME = "metadata.creation-time" - val METADATA_LAST_ACCESS_TIME = "metadata.last-access-time" - -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala deleted file mode 100644 index 1707e1b..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/Statistics.scala +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util - -import org.apache.flink.table.descriptors.StatisticsValidator._ -import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** - * Statistics descriptor for describing table stats. - */ -class Statistics extends Descriptor { - - private var rowCount: Option[Long] = None - private val columnStats: mutable.LinkedHashMap[String, mutable.Map[String, String]] = - mutable.LinkedHashMap[String, mutable.Map[String, String]]() - - /** - * Sets the statistics from a [[TableStats]] instance. - * - * This method overwrites all existing statistics. - * - * @param tableStats the table statistics - */ - def tableStats(tableStats: TableStats): Statistics = { - rowCount(tableStats.rowCount) - columnStats.clear() - tableStats.colStats.asScala.foreach { case (col, stats) => - columnStats(col, stats) - } - this - } - - /** - * Sets statistics for the overall row count. Required. - * - * @param rowCount the expected number of rows - */ - def rowCount(rowCount: Long): Statistics = { - this.rowCount = Some(rowCount) - this - } - - /** - * Sets statistics for a column. Overwrites all existing statistics for this column. - * - * @param columnName the column name - * @param columnStats expected statistics for the column - */ - def columnStats(columnName: String, columnStats: ColumnStats): Statistics = { - val map = mutable.Map(normalizeColumnStats(columnStats).toSeq: _*) - this.columnStats.put(columnName, map) - this - } - - /** - * Sets the number of distinct values statistic for the given column. - */ - def columnDistinctCount(columnName: String, ndv: Long): Statistics = { - this.columnStats - .getOrElseUpdate(columnName, mutable.HashMap()) - .put(DISTINCT_COUNT, ndv.toString) - this - } - - /** - * Sets the number of null values statistic for the given column. - */ - def columnNullCount(columnName: String, nullCount: Long): Statistics = { - this.columnStats - .getOrElseUpdate(columnName, mutable.HashMap()) - .put(NULL_COUNT, nullCount.toString) - this - } - - /** - * Sets the average length statistic for the given column. - */ - def columnAvgLength(columnName: String, avgLen: Double): Statistics = { - this.columnStats - .getOrElseUpdate(columnName, mutable.HashMap()) - .put(AVG_LENGTH, avgLen.toString) - this - } - - /** - * Sets the maximum length statistic for the given column. - */ - def columnMaxLength(columnName: String, maxLen: Integer): Statistics = { - this.columnStats - .getOrElseUpdate(columnName, mutable.HashMap()) - .put(MAX_LENGTH, maxLen.toString) - this - } - - /** - * Sets the maximum value statistic for the given column. - */ - def columnMaxValue(columnName: String, max: Number): Statistics = { - this.columnStats - .getOrElseUpdate(columnName, mutable.HashMap()) - .put(MAX_VALUE, max.toString) - this - } - - /** - * Sets the minimum value statistic for the given column. - */ - def columnMinValue(columnName: String, min: Number): Statistics = { - this.columnStats - .getOrElseUpdate(columnName, mutable.HashMap()) - .put(MIN_VALUE, min.toString) - this - } - - /** - * Converts this descriptor into a set of properties. - */ - final override def toProperties: util.Map[String, String] = { - val properties = new DescriptorProperties() - - properties.putInt(STATISTICS_PROPERTY_VERSION, 1) - rowCount.foreach(rc => properties.putLong(STATISTICS_ROW_COUNT, rc)) - val namedStats = columnStats.map { case (name, stats) => - // name should not be part of the properties key - (stats + (NAME -> name)).toMap.asJava - }.toList.asJava - properties.putIndexedVariableProperties(STATISTICS_COLUMNS, namedStats) - - properties.asMap() - } -} - -/** - * Statistics descriptor for describing table stats. - */ -object Statistics { - - /** - * Statistics descriptor for describing table stats. - * - * @deprecated Use `new Statistics()`. - */ - @deprecated - def apply(): Statistics = new Statistics() -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala deleted file mode 100644 index 518f292..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StatisticsValidator.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.descriptors.StatisticsValidator.{STATISTICS_COLUMNS, STATISTICS_PROPERTY_VERSION, STATISTICS_ROW_COUNT, validateColumnStats} -import org.apache.flink.table.plan.stats.ColumnStats -import org.apache.flink.table.util.JavaScalaConversionUtil.toScala - -import scala.collection.mutable - -/** - * Validator for [[FormatDescriptor]]. - */ -class StatisticsValidator extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - properties.validateInt(STATISTICS_PROPERTY_VERSION, true, 0, Integer.MAX_VALUE) - properties.validateLong(STATISTICS_ROW_COUNT, true, 0) - validateColumnStats(properties, STATISTICS_COLUMNS) - } -} - -object StatisticsValidator { - - val STATISTICS_PROPERTY_VERSION = "statistics.property-version" - val STATISTICS_ROW_COUNT = "statistics.row-count" - val STATISTICS_COLUMNS = "statistics.columns" - - // per column properties - - val NAME = "name" - val DISTINCT_COUNT = "distinct-count" - val NULL_COUNT = "null-count" - val AVG_LENGTH = "avg-length" - val MAX_LENGTH = "max-length" - val MAX_VALUE = "max-value" - val MIN_VALUE = "min-value" - - // utilities - - def normalizeColumnStats(columnStats: ColumnStats): Map[String, String] = { - val stats = mutable.HashMap[String, String]() - if (columnStats.ndv != null) { - stats += DISTINCT_COUNT -> columnStats.ndv.toString - } - if (columnStats.nullCount != null) { - stats += NULL_COUNT -> columnStats.nullCount.toString - } - if (columnStats.avgLen != null) { - stats += AVG_LENGTH -> columnStats.avgLen.toString - } - if (columnStats.maxLen != null) { - stats += MAX_LENGTH -> columnStats.maxLen.toString - } - if (columnStats.max != null) { - stats += MAX_VALUE -> columnStats.max.toString - } - if (columnStats.min != null) { - stats += MIN_VALUE -> columnStats.min.toString - } - stats.toMap - } - - def validateColumnStats(properties: DescriptorProperties, key: String): Unit = { - - // filter for number of columns - val columnCount = properties.getIndexedProperty(key, NAME).size - - for (i <- 0 until columnCount) { - properties.validateString(s"$key.$i.$NAME", false, 1) - properties.validateLong(s"$key.$i.$DISTINCT_COUNT", true, 0L) - properties.validateLong(s"$key.$i.$NULL_COUNT", true, 0L) - properties.validateDouble(s"$key.$i.$AVG_LENGTH", true, 0.0) - properties.validateInt(s"$key.$i.$MAX_LENGTH", true, 0) - properties.validateDouble(s"$key.$i.$MAX_VALUE", true, 0.0) - properties.validateDouble(s"$key.$i.$MIN_VALUE", true, 0.0) - } - } - - def readColumnStats(properties: DescriptorProperties, key: String): Map[String, ColumnStats] = { - - // filter for number of columns - val columnCount = properties.getIndexedProperty(key, NAME).size - - val stats = for (i <- 0 until columnCount) yield { - val name = toScala(properties.getOptionalString(s"$key.$i.$NAME")).getOrElse( - throw new ValidationException(s"Could not find name of property '$key.$i.$NAME'.")) - - val stats = ColumnStats( - properties.getOptionalLong(s"$key.$i.$DISTINCT_COUNT").orElse(null), - properties.getOptionalLong(s"$key.$i.$NULL_COUNT").orElse(null), - properties.getOptionalDouble(s"$key.$i.$AVG_LENGTH").orElse(null), - properties.getOptionalInt(s"$key.$i.$MAX_LENGTH").orElse(null), - properties.getOptionalDouble(s"$key.$i.$MAX_VALUE").orElse(null), - properties.getOptionalDouble(s"$key.$i.$MIN_VALUE").orElse(null) - ) - - name -> stats - } - - stats.toMap - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala deleted file mode 100644 index 7412e5b..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptorValidator.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util - -import org.apache.flink.table.descriptors.StreamTableDescriptorValidator._ - -/** - * Validator for [[StreamTableDescriptor]]. - */ -class StreamTableDescriptorValidator( - supportsAppend: Boolean, - supportsRetract: Boolean, - supportsUpsert: Boolean) - extends DescriptorValidator { - - override def validate(properties: DescriptorProperties): Unit = { - val modeList = new util.ArrayList[String]() - if (supportsAppend) { - modeList.add(UPDATE_MODE_VALUE_APPEND) - } - if (supportsRetract) { - modeList.add(UPDATE_MODE_VALUE_RETRACT) - } - if (supportsUpsert) { - modeList.add(UPDATE_MODE_VALUE_UPSERT) - } - properties.validateEnumValues( - UPDATE_MODE, - false, - modeList - ) - } -} - -object StreamTableDescriptorValidator { - - val UPDATE_MODE = "update-mode" - val UPDATE_MODE_VALUE_APPEND = "append" - val UPDATE_MODE_VALUE_RETRACT = "retract" - val UPDATE_MODE_VALUE_UPSERT = "upsert" -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala deleted file mode 100644 index 0d424bd..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/descriptors/StreamableDescriptor.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -/** - * A trait for descriptors that allow to convert between a dynamic table and an external connector. - */ -trait StreamableDescriptor[D <: StreamableDescriptor[D]] extends TableDescriptor { - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * In append mode, a dynamic table and an external connector only exchange INSERT messages. - * - * @see See also [[inRetractMode()]] and [[inUpsertMode()]]. - */ - def inAppendMode(): D - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * In retract mode, a dynamic table and an external connector exchange ADD and RETRACT messages. - * - * An INSERT change is encoded as an ADD message, a DELETE change as a RETRACT message, and an - * UPDATE change as a RETRACT message for the updated (previous) row and an ADD message for - * the updating (new) row. - * - * In this mode, a key must not be defined as opposed to upsert mode. However, every update - * consists of two messages which is less efficient. - * - * @see See also [[inAppendMode()]] and [[inUpsertMode()]]. - */ - def inRetractMode(): D - - /** - * Declares how to perform the conversion between a dynamic table and an external connector. - * - * In upsert mode, a dynamic table and an external connector exchange UPSERT and DELETE messages. - * - * This mode requires a (possibly composite) unique key by which updates can be propagated. The - * external connector needs to be aware of the unique key attribute in order to apply messages - * correctly. INSERT and UPDATE changes are encoded as UPSERT messages. DELETE changes as - * DELETE messages. - * - * The main difference to a retract stream is that UPDATE changes are encoded with a single - * message and are therefore more efficient. - * - * @see See also [[inAppendMode()]] and [[inRetractMode()]]. - */ - def inUpsertMode(): D -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala index c1515b1..4227498 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala @@ -25,5 +25,5 @@ class DataSetTable[T]( val dataSet: DataSet[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String], - override val statistic: FlinkStatistic = FlinkStatistic.of(TableStats(1000L))) + override val statistic: FlinkStatistic = FlinkStatistic.of(new TableStats(1000L))) extends InlineTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala deleted file mode 100644 index 8cf06b5..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.stats - -import java.lang.{Double, Long} - -/** - * column statistics - * - * @param ndv number of distinct values - * @param nullCount number of nulls - * @param avgLen average length of column values - * @param maxLen max length of column values - * @param max max value of column values - * @param min min value of column values - */ -case class ColumnStats( - ndv: Long, - nullCount: Long, - avgLen: Double, - maxLen: Integer, - max: Number, - min: Number) { - - override def toString: String = { - val columnStatsStr = Seq( - if (ndv != null) s"ndv=$ndv" else "", - if (nullCount != null) s"nullCount=$nullCount" else "", - if (avgLen != null) s"avgLen=$avgLen" else "", - if (maxLen != null) s"maxLen=$maxLen" else "", - if (max != null) s"max=${max}" else "", - if (min != null) s"min=${min}" else "" - ).filter(_.nonEmpty).mkString(", ") - - s"ColumnStats(${columnStatsStr})" - } - -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala index da08916..5469f94 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala @@ -49,7 +49,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic { * @return The stats of the specified column. */ def getColumnStats(columnName: String): ColumnStats = tableStats match { - case Some(tStats) => tStats.colStats.get(columnName) + case Some(tStats) => tStats.getColumnStats.get(columnName) case None => null } @@ -59,7 +59,7 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends Statistic { * @return The number of rows of the table. */ override def getRowCount: Double = tableStats match { - case Some(tStats) => tStats.rowCount.toDouble + case Some(tStats) => tStats.getRowCount.toDouble case None => null } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala deleted file mode 100644 index fe7c75d..0000000 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util -import java.util.Collections -import java.util.function.Consumer - -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.util.JavaScalaConversionUtil.toJava -import org.junit.Assert.assertEquals -import org.junit.Test - -/** - * Tests for [[DescriptorProperties]]. - */ -class DescriptorPropertiesTest { - - private val ARRAY_KEY = "my-array" - private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property" - private val PROPERTY_1_KEY = "property-1" - private val PROPERTY_2_KEY = "property-2" - - @Test - def testEquals(): Unit = { - val properties1 = new DescriptorProperties() - properties1.putString("hello1", "12") - properties1.putString("hello2", "13") - properties1.putString("hello3", "14") - - val properties2 = new DescriptorProperties() - properties2.putString("hello1", "12") - properties2.putString("hello2", "13") - properties2.putString("hello3", "14") - - val properties3 = new DescriptorProperties() - properties3.putString("hello1", "12") - properties3.putString("hello3", "14") - properties3.putString("hello2", "13") - - assertEquals(properties1, properties2) - - assertEquals(properties1, properties3) - } - - @Test - def testMissingArray(): Unit = { - val properties = new DescriptorProperties() - - testArrayValidation(properties, 0, Integer.MAX_VALUE) - } - - @Test - def testArrayValues(): Unit = { - val properties = new DescriptorProperties() - - properties.putString(s"$ARRAY_KEY.0", "12") - properties.putString(s"$ARRAY_KEY.1", "42") - properties.putString(s"$ARRAY_KEY.2", "66") - - testArrayValidation(properties, 1, Integer.MAX_VALUE) - - assertEquals( - util.Arrays.asList(12, 42, 66), - properties.getArray(ARRAY_KEY, toJava((key: String) => { - properties.getInt(key) - }))) - } - - @Test - def testArraySingleValue(): Unit = { - val properties = new DescriptorProperties() - properties.putString(ARRAY_KEY, "12") - - testArrayValidation(properties, 1, Integer.MAX_VALUE) - - assertEquals( - Collections.singletonList(12), - properties.getArray(ARRAY_KEY, toJava((key: String) => { - properties.getInt(key) - }))) - } - - @Test(expected = classOf[ValidationException]) - def testArrayInvalidValues(): Unit = { - val properties = new DescriptorProperties() - properties.putString(s"$ARRAY_KEY.0", "12") - properties.putString(s"$ARRAY_KEY.1", "66") - properties.putString(s"$ARRAY_KEY.2", "INVALID") - - testArrayValidation(properties, 1, Integer.MAX_VALUE) - } - - @Test(expected = classOf[ValidationException]) - def testArrayInvalidSingleValue(): Unit = { - val properties = new DescriptorProperties() - properties.putString(ARRAY_KEY, "INVALID") - - testArrayValidation(properties, 1, Integer.MAX_VALUE) - } - - @Test(expected = classOf[ValidationException]) - def testInvalidMissingArray(): Unit = { - val properties = new DescriptorProperties() - - testArrayValidation(properties, 1, Integer.MAX_VALUE) - } - - @Test(expected = classOf[ValidationException]) - def testInvalidFixedIndexedProperties(): Unit = { - val property = new DescriptorProperties() - val list = new util.ArrayList[util.List[String]]() - list.add(util.Arrays.asList("1", "string")) - list.add(util.Arrays.asList("INVALID", "string")) - property.putIndexedFixedProperties( - FIXED_INDEXED_PROPERTY_KEY, - util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY), - list) - testFixedIndexedPropertiesValidation(property) - } - - @Test - def testRemoveKeys(): Unit = { - val properties = new DescriptorProperties() - properties.putString("hello1", "12") - properties.putString("hello2", "13") - properties.putString("hello3", "14") - - val actual = properties.withoutKeys(util.Arrays.asList("hello1", "hello3")) - - val expected = new DescriptorProperties() - expected.putString("hello2", "13") - - assertEquals(expected, actual) - } - - @Test - def testPrefixedMap(): Unit = { - val properties = new DescriptorProperties() - properties.putString("hello1", "12") - properties.putString("hello2", "13") - properties.putString("hello3", "14") - - val actual = properties.asPrefixedMap("prefix.") - - val expected = new DescriptorProperties() - expected.putString("prefix.hello1", "12") - expected.putString("prefix.hello2", "13") - expected.putString("prefix.hello3", "14") - - assertEquals(expected.asMap, actual) - } - - private def testArrayValidation( - properties: DescriptorProperties, - minLength: Int, - maxLength: Int) - : Unit = { - val validator: String => Unit = (key: String) => { - properties.validateInt(key, false) - } - - properties.validateArray( - ARRAY_KEY, - toJava(validator), - minLength, - maxLength) - } - - private def testFixedIndexedPropertiesValidation(properties: DescriptorProperties): Unit = { - - val validatorMap = new util.HashMap[String, Consumer[String]]() - - // PROPERTY_1 should be Int - val validator1: String => Unit = (key: String) => { - properties.validateInt(key, false) - } - validatorMap.put(PROPERTY_1_KEY, toJava(validator1)) - // PROPERTY_2 should be String - val validator2: String => Unit = (key: String) => { - properties.validateString(key, false) - } - validatorMap.put(PROPERTY_2_KEY, toJava(validator2)) - - properties.validateFixedIndexedProperties( - FIXED_INDEXED_PROPERTY_KEY, - false, - validatorMap - ) - } -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala deleted file mode 100644 index 64965b0..0000000 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/MetadataTest.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import java.util - -import org.apache.flink.table.api.ValidationException -import org.junit.Test - -import scala.collection.JavaConverters._ - -class MetadataTest extends DescriptorTestBase { - - @Test(expected = classOf[ValidationException]) - def testInvalidCreationTime(): Unit = { - addPropertyAndVerify(descriptors().get(0), "metadata.creation-time", "dfghj") - } - - // ---------------------------------------------------------------------------------------------- - - override def descriptors(): util.List[Descriptor] = { - val desc = Metadata() - .comment("Some additional comment") - .creationTime(123L) - .lastAccessTime(12020202L) - - util.Arrays.asList(desc) - } - - override def validator(): DescriptorValidator = { - new MetadataValidator() - } - - override def properties(): util.List[util.Map[String, String]] = { - val props = Map( - "metadata.comment" -> "Some additional comment", - "metadata.creation-time" -> "123", - "metadata.last-access-time" -> "12020202" - ) - - util.Arrays.asList(props.asJava) - } -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala deleted file mode 100644 index 2def0c3..0000000 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/descriptors/StatisticsTest.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors - -import _root_.java.util - -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.plan.stats.{ColumnStats, TableStats} -import org.junit.Test - -import scala.collection.JavaConverters._ - -class StatisticsTest extends DescriptorTestBase { - - @Test(expected = classOf[ValidationException]) - def testInvalidRowCount(): Unit = { - addPropertyAndVerify(descriptors().get(0), "statistics.row-count", "abx") - } - - @Test(expected = classOf[ValidationException]) - def testMissingName(): Unit = { - removePropertyAndVerify(descriptors().get(0), "statistics.columns.0.name") - } - - // ---------------------------------------------------------------------------------------------- - - override def descriptors(): util.List[Descriptor] = { - val desc1 = Statistics() - .rowCount(1000L) - .columnStats("a", ColumnStats(1L, 2L, 3.0, 4, 5, 6)) - .columnAvgLength("b", 42.0) - .columnNullCount("a", 300) - - val map = new util.HashMap[String, ColumnStats]() - map.put("a", ColumnStats(null, 2L, 3.0, null, 5, 6)) - val desc2 = Statistics() - .tableStats(TableStats(32L, map)) - - util.Arrays.asList(desc1, desc2) - } - - override def validator(): DescriptorValidator = { - new StatisticsValidator() - } - - override def properties(): util.List[util.Map[String, String]] = { - val props1 = Map( - "statistics.property-version" -> "1", - "statistics.row-count" -> "1000", - "statistics.columns.0.name" -> "a", - "statistics.columns.0.distinct-count" -> "1", - "statistics.columns.0.null-count" -> "300", - "statistics.columns.0.avg-length" -> "3.0", - "statistics.columns.0.max-length" -> "4", - "statistics.columns.0.max-value" -> "5", - "statistics.columns.0.min-value" -> "6", - "statistics.columns.1.name" -> "b", - "statistics.columns.1.avg-length" -> "42.0" - ) - - val props2 = Map( - "statistics.property-version" -> "1", - "statistics.row-count" -> "32", - "statistics.columns.0.name" -> "a", - "statistics.columns.0.null-count" -> "2", - "statistics.columns.0.avg-length" -> "3.0", - "statistics.columns.0.max-value" -> "5", - "statistics.columns.0.min-value" -> "6" - ) - - util.Arrays.asList(props1.asJava, props2.asJava) - } -}