This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e4ae2ef [FLINK-25229][table] Introduce flink-table-api-bridge-base e4ae2ef is described below commit e4ae2ef81e9ecbda10c4dcc5776584b07c2f5e6b Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Thu Dec 9 14:55:05 2021 +0100 [FLINK-25229][table] Introduce flink-table-api-bridge-base This closes #18065. --- flink-connectors/flink-connector-hbase-1.4/pom.xml | 6 + flink-connectors/flink-connector-hbase-2.2/pom.xml | 6 + flink-connectors/flink-connector-hive/pom.xml | 6 + flink-connectors/flink-connector-jdbc/pom.xml | 6 + flink-connectors/flink-connector-kafka/pom.xml | 6 + flink-formats/flink-avro/pom.xml | 6 + flink-formats/flink-csv/pom.xml | 6 + flink-formats/flink-json/pom.xml | 6 + flink-formats/flink-orc/pom.xml | 6 + flink-formats/flink-parquet/pom.xml | 6 + flink-python/pom.xml | 6 + flink-table/flink-sql-client/pom.xml | 6 + .../pom.xml | 46 +-- .../AbstractStreamTableEnvironmentImpl.java | 329 +++++++++++++++++++++ .../table/delegation/StreamExecutorFactory.java | 37 +++ .../operations/DataStreamQueryOperation.java} | 25 +- .../table/operations/ExternalQueryOperation.java} | 4 +- flink-table/flink-table-api-java-bridge/pom.xml | 6 + .../java/internal/StreamTableEnvironmentImpl.java | 270 +---------------- .../operations/JavaDataStreamQueryOperation.java | 116 -------- .../table/api/internal/TableEnvironmentImpl.java | 3 +- .../flink/table/delegation/ExecutorFactory.java | 11 +- flink-table/flink-table-api-scala-bridge/pom.xml | 6 + .../operations/ScalaExternalQueryOperation.java | 121 -------- .../internal/StreamTableEnvironmentImpl.scala | 233 ++------------- flink-table/flink-table-planner/pom.xml | 24 +- .../planner/delegation/DefaultExecutorFactory.java | 4 +- ....java => InternalDataStreamQueryOperation.java} | 8 +- .../planner/plan/QueryOperationConverter.java | 44 +-- .../flink/table/planner/utils/TableTestBase.scala | 33 +-- flink-table/flink-table-uber/pom.xml | 6 + flink-table/pom.xml | 1 + tools/ci/stage.sh | 2 + 33 files changed, 549 insertions(+), 852 deletions(-) diff --git a/flink-connectors/flink-connector-hbase-1.4/pom.xml b/flink-connectors/flink-connector-hbase-1.4/pom.xml index 99bac72..cd4b56c 100644 --- a/flink-connectors/flink-connector-hbase-1.4/pom.xml +++ b/flink-connectors/flink-connector-hbase-1.4/pom.xml @@ -135,6 +135,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-connectors/flink-connector-hbase-2.2/pom.xml b/flink-connectors/flink-connector-hbase-2.2/pom.xml index ba86049..ed520ce 100644 --- a/flink-connectors/flink-connector-hbase-2.2/pom.xml +++ b/flink-connectors/flink-connector-hbase-2.2/pom.xml @@ -260,6 +260,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 37f3dea..f807b12 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -531,6 +531,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index 3ccecc6..4edd126 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -104,6 +104,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 49b6cbb..4089e09 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -187,6 +187,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml index a6e51b3..d2e05ed 100644 --- a/flink-formats/flink-avro/pom.xml +++ b/flink-formats/flink-avro/pom.xml @@ -140,6 +140,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml index 244dbae..d2fbd47 100644 --- a/flink-formats/flink-csv/pom.xml +++ b/flink-formats/flink-csv/pom.xml @@ -104,6 +104,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index 0e60961..a042949 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -97,6 +97,12 @@ under the License. <!-- Json filesystem format factory ITCase test dependency --> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index d30a21f..fad8c27 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -134,6 +134,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/flink-formats/flink-parquet/pom.xml b/flink-formats/flink-parquet/pom.xml index 35bda88..06fdd81b 100644 --- a/flink-formats/flink-parquet/pom.xml +++ b/flink-formats/flink-parquet/pom.xml @@ -174,6 +174,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 53d0acd..7b68c29 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -70,6 +70,12 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml index 44ec8ad..ba43157 100644 --- a/flink-table/flink-sql-client/pom.xml +++ b/flink-table/flink-sql-client/pom.xml @@ -172,6 +172,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/flink-table/flink-table-api-java-bridge/pom.xml b/flink-table/flink-table-api-bridge-base/pom.xml similarity index 56% copy from flink-table/flink-table-api-java-bridge/pom.xml copy to flink-table/flink-table-api-bridge-base/pom.xml index 1d4d64e..31628f0 100644 --- a/flink-table/flink-table-api-java-bridge/pom.xml +++ b/flink-table/flink-table-api-bridge-base/pom.xml @@ -16,7 +16,7 @@ specific language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -27,11 +27,11 @@ under the License. <relativePath>..</relativePath> </parent> - <artifactId>flink-table-api-java-bridge</artifactId> - <name>Flink : Table : API Java bridge</name> + <artifactId>flink-table-api-bridge-base</artifactId> + <name>Flink : Table : API bridge base</name> <description> - This module contains the Table/SQL API for writing table programs - that interact with other Flink APIs using the Java programming language. + This module contains the base code for bridging between Table API and DataStream API for + both Java and Scala. </description> <packaging>jar</packaging> @@ -39,11 +39,6 @@ under the License. <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> </dependency> @@ -52,41 +47,10 @@ under the License. <artifactId>flink-streaming-java</artifactId> <version>${project.version}</version> </dependency> - - <!-- test dependencies --> - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils-junit</artifactId> </dependency> </dependencies> </project> diff --git a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java new file mode 100644 index 0000000..79b4885 --- /dev/null +++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.bridge.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.SchemaResolver; +import org.apache.flink.table.catalog.SchemaTranslator; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.StreamExecutorFactory; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.operations.DataStreamQueryOperation; +import org.apache.flink.table.operations.ExternalModifyOperation; +import org.apache.flink.table.operations.ExternalQueryOperation; +import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.utils.OperationTreeBuilder; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.table.typeutils.FieldInfoUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Abstract class to implement a {@code StreamTableEnvironment}. */ +@Internal +public abstract class AbstractStreamTableEnvironmentImpl extends TableEnvironmentImpl { + + protected final StreamExecutionEnvironment executionEnvironment; + + public AbstractStreamTableEnvironmentImpl( + CatalogManager catalogManager, + ModuleManager moduleManager, + TableConfig tableConfig, + Executor executor, + FunctionCatalog functionCatalog, + Planner planner, + boolean isStreamingMode, + ClassLoader userClassLoader, + StreamExecutionEnvironment executionEnvironment) { + super( + catalogManager, + moduleManager, + tableConfig, + executor, + functionCatalog, + planner, + isStreamingMode, + userClassLoader); + this.executionEnvironment = executionEnvironment; + } + + public static Executor lookupExecutor( + ClassLoader classLoader, + String executorIdentifier, + StreamExecutionEnvironment executionEnvironment) { + final ExecutorFactory executorFactory; + try { + executorFactory = + FactoryUtil.discoverFactory( + classLoader, ExecutorFactory.class, executorIdentifier); + } catch (Exception e) { + throw new TableException( + "Could not instantiate the executor. Make sure a planner module is on the classpath", + e); + } + if (executorFactory instanceof StreamExecutorFactory) { + return ((StreamExecutorFactory) executorFactory).create(executionEnvironment); + } else { + throw new TableException( + "The resolved ExecutorFactory '" + + executorFactory.getClass() + + "' doesn't implement StreamExecutorFactory."); + } + } + + protected <T> Table fromStreamInternal( + DataStream<T> dataStream, + @Nullable Schema schema, + @Nullable String viewPath, + ChangelogMode changelogMode) { + Preconditions.checkNotNull(dataStream, "Data stream must not be null."); + Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null."); + + if (dataStream.getExecutionEnvironment() != executionEnvironment) { + throw new ValidationException( + "The DataStream's StreamExecutionEnvironment must be identical to the one that " + + "has been passed to the StreamTableEnvironment during instantiation."); + } + + final CatalogManager catalogManager = getCatalogManager(); + final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); + final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); + + final UnresolvedIdentifier unresolvedIdentifier; + if (viewPath != null) { + unresolvedIdentifier = getParser().parseIdentifier(viewPath); + } else { + unresolvedIdentifier = + UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId()); + } + final ObjectIdentifier objectIdentifier = + catalogManager.qualifyIdentifier(unresolvedIdentifier); + + final SchemaTranslator.ConsumingResult schemaTranslationResult = + SchemaTranslator.createConsumingResult( + catalogManager.getDataTypeFactory(), dataStream.getType(), schema); + + final ResolvedSchema resolvedSchema = + schemaTranslationResult.getSchema().resolve(schemaResolver); + + final QueryOperation scanOperation = + new ExternalQueryOperation<>( + objectIdentifier, + dataStream, + schemaTranslationResult.getPhysicalDataType(), + schemaTranslationResult.isTopLevelRecord(), + changelogMode, + resolvedSchema); + + final List<String> projections = schemaTranslationResult.getProjections(); + if (projections == null) { + return createTable(scanOperation); + } + + final QueryOperation projectOperation = + operationTreeBuilder.project( + projections.stream() + .map(ApiExpressionUtils::unresolvedRef) + .collect(Collectors.toList()), + scanOperation); + + return createTable(projectOperation); + } + + protected <T> DataStream<T> toStreamInternal( + Table table, + SchemaTranslator.ProducingResult schemaTranslationResult, + @Nullable ChangelogMode changelogMode) { + final CatalogManager catalogManager = getCatalogManager(); + final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); + final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); + + final QueryOperation projectOperation = + schemaTranslationResult + .getProjections() + .map( + projections -> + operationTreeBuilder.project( + projections.stream() + .map(ApiExpressionUtils::unresolvedRef) + .collect(Collectors.toList()), + table.getQueryOperation())) + .orElseGet(table::getQueryOperation); + + final ResolvedSchema resolvedSchema = + schemaResolver.resolve(schemaTranslationResult.getSchema()); + + final UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of( + "Unregistered_DataStream_Sink_" + ExternalModifyOperation.getUniqueId()); + final ObjectIdentifier objectIdentifier = + catalogManager.qualifyIdentifier(unresolvedIdentifier); + + final ExternalModifyOperation modifyOperation = + new ExternalModifyOperation( + objectIdentifier, + projectOperation, + resolvedSchema, + changelogMode, + schemaTranslationResult + .getPhysicalDataType() + .orElseGet(resolvedSchema::toPhysicalRowDataType)); + + return toStreamInternal(table, modifyOperation); + } + + protected <T> DataStream<T> toStreamInternal(Table table, ModifyOperation modifyOperation) { + final List<Transformation<?>> transformations = + planner.translate(Collections.singletonList(modifyOperation)); + + final Transformation<T> transformation = getTransformation(table, transformations); + executionEnvironment.addOperator(transformation); + + // reconfigure whenever planner transformations are added + executionEnvironment.configure(tableConfig.getConfiguration()); + + return new DataStream<>(executionEnvironment, transformation); + } + + /** + * This is a temporary workaround for Python API. Python API should not use + * StreamExecutionEnvironment at all. + */ + @Internal + public StreamExecutionEnvironment execEnv() { + return executionEnvironment; + } + + protected <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) { + try { + return TypeExtractor.createTypeInfo(clazz); + } catch (Exception ex) { + throw new ValidationException( + String.format( + "Could not convert query: %s to a DataStream of class %s", + table.getQueryOperation().asSummaryString(), clazz.getSimpleName()), + ex); + } + } + + @SuppressWarnings("unchecked") + private <T> Transformation<T> getTransformation( + Table table, List<Transformation<?>> transformations) { + if (transformations.size() != 1) { + throw new TableException( + String.format( + "Expected a single transformation for query: %s\n Got: %s", + table.getQueryOperation().asSummaryString(), transformations)); + } + + return (Transformation<T>) transformations.get(0); + } + + protected <T> DataType wrapWithChangeFlag(TypeInformation<T> outputType) { + TupleTypeInfo tupleTypeInfo = + new TupleTypeInfo<Tuple2<Boolean, T>>(Types.BOOLEAN(), outputType); + return TypeConversions.fromLegacyInfoToDataType(tupleTypeInfo); + } + + protected <T> DataStreamQueryOperation<T> asQueryOperation( + DataStream<T> dataStream, Optional<List<Expression>> fields) { + TypeInformation<T> streamType = dataStream.getType(); + + // get field names and types for all non-replaced fields + FieldInfoUtils.TypeInfoSchema typeInfoSchema = + fields.map( + f -> { + FieldInfoUtils.TypeInfoSchema fieldsInfo = + FieldInfoUtils.getFieldsInfo( + streamType, f.toArray(new Expression[0])); + + // check if event-time is enabled + validateTimeCharacteristic(fieldsInfo.isRowtimeDefined()); + return fieldsInfo; + }) + .orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType)); + + return new DataStreamQueryOperation<>( + dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toResolvedSchema()); + } + + protected void validateTimeCharacteristic(boolean isRowtimeDefined) { + if (isRowtimeDefined + && executionEnvironment.getStreamTimeCharacteristic() + != TimeCharacteristic.EventTime) { + throw new ValidationException( + String.format( + "A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s", + executionEnvironment.getStreamTimeCharacteristic())); + } + } + + @Override + protected QueryOperation qualifyQueryOperation( + ObjectIdentifier identifier, QueryOperation queryOperation) { + if (queryOperation instanceof DataStreamQueryOperation) { + DataStreamQueryOperation<?> operation = (DataStreamQueryOperation) queryOperation; + return new DataStreamQueryOperation<>( + identifier, + operation.getDataStream(), + operation.getFieldIndices(), + operation.getResolvedSchema()); + } else { + return queryOperation; + } + } + + public void attachAsDataStream(List<ModifyOperation> modifyOperations) { + final List<Transformation<?>> transformations = translate(modifyOperations); + transformations.forEach(executionEnvironment::addOperator); + } +} diff --git a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/delegation/StreamExecutorFactory.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/delegation/StreamExecutorFactory.java new file mode 100644 index 0000000..ab44871 --- /dev/null +++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/delegation/StreamExecutorFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; + +/** Sub interface of {@link ExecutorFactory} to support {@link DataStream} API. */ +@Internal +public interface StreamExecutorFactory extends ExecutorFactory { + + /** + * Creates a corresponding {@link Executor}. + * + * <p>This method will be used when instantiating a {@link TableEnvironment} from one of the + * bridging modules which enables conversion from/to {@link DataStream} API. + */ + Executor create(StreamExecutionEnvironment streamExecutionEnvironment); +} diff --git a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaDataStreamQueryOperation.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java similarity index 86% rename from flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaDataStreamQueryOperation.java rename to flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java index 4c375e2..ae0ea36 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaDataStreamQueryOperation.java +++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java @@ -37,17 +37,18 @@ import java.util.Optional; * * <p>This operation may expose only part, or change the order of the fields available in a {@link * org.apache.flink.api.common.typeutils.CompositeType} of the underlying {@link DataStream}. The - * {@link ScalaDataStreamQueryOperation#getFieldIndices()} describes the mapping between fields of - * the {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. + * {@link DataStreamQueryOperation#getFieldIndices()} describes the mapping between fields of the + * {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. */ @Internal -public class ScalaDataStreamQueryOperation<E> implements QueryOperation { +@Deprecated +public class DataStreamQueryOperation<E> implements QueryOperation { /** * The table identifier registered under the environment. The identifier might be null when the - * it is from {@code StreamTableEnvironment#fromDataStream(DataStream)}. But the identifier - * should be not null if is from {@code StreamTableEnvironment#createTemporaryView(String, - * DataStream)} with a registered name. + * it is from {@code StreamTableEnvironment#fromDataStream(DataStream, Expression...)}. But the + * identifier should be not null if is from {@code + * StreamTableEnvironment#createTemporaryView(String, DataStream)} with a registered name. */ @Nullable private final ObjectIdentifier identifier; @@ -55,12 +56,12 @@ public class ScalaDataStreamQueryOperation<E> implements QueryOperation { private final int[] fieldIndices; private final ResolvedSchema resolvedSchema; - public ScalaDataStreamQueryOperation( + public DataStreamQueryOperation( DataStream<E> dataStream, int[] fieldIndices, ResolvedSchema resolvedSchema) { this(null, dataStream, fieldIndices, resolvedSchema); } - public ScalaDataStreamQueryOperation( + public DataStreamQueryOperation( ObjectIdentifier identifier, DataStream<E> dataStream, int[] fieldIndices, @@ -75,6 +76,10 @@ public class ScalaDataStreamQueryOperation<E> implements QueryOperation { return dataStream; } + public Optional<ObjectIdentifier> getIdentifier() { + return Optional.ofNullable(identifier); + } + public int[] getFieldIndices() { return fieldIndices; } @@ -84,10 +89,6 @@ public class ScalaDataStreamQueryOperation<E> implements QueryOperation { return resolvedSchema; } - public Optional<ObjectIdentifier> getIdentifier() { - return Optional.ofNullable(identifier); - } - @Override public String asSummaryString() { Map<String, Object> args = new LinkedHashMap<>(); diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaExternalQueryOperation.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java similarity index 96% rename from flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaExternalQueryOperation.java rename to flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java index cd8b3db..d7e96d1 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaExternalQueryOperation.java +++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java @@ -41,7 +41,7 @@ import java.util.Map; * @param <E> External type of data stream */ @Internal -public final class JavaExternalQueryOperation<E> implements QueryOperation { +public final class ExternalQueryOperation<E> implements QueryOperation { private final ObjectIdentifier identifier; @@ -55,7 +55,7 @@ public final class JavaExternalQueryOperation<E> implements QueryOperation { private final ResolvedSchema resolvedSchema; - public JavaExternalQueryOperation( + public ExternalQueryOperation( ObjectIdentifier identifier, DataStream<E> dataStream, DataType physicalDataType, diff --git a/flink-table/flink-table-api-java-bridge/pom.xml b/flink-table/flink-table-api-java-bridge/pom.xml index 1d4d64e..c2246fe 100644 --- a/flink-table/flink-table-api-java-bridge/pom.xml +++ b/flink-table/flink-table-api-java-bridge/pom.xml @@ -44,6 +44,12 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-bridge-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${project.version}</version> </dependency> diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index b0957f6..c6c7d43 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -20,11 +20,7 @@ package org.apache.flink.table.api.bridge.java.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; @@ -32,58 +28,36 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.Types; -import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl; import org.apache.flink.table.api.bridge.java.StreamStatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.SchemaResolver; import org.apache.flink.table.catalog.SchemaTranslator; -import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExpressionParser; import org.apache.flink.table.delegation.Planner; -import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.operations.ExternalModifyOperation; -import org.apache.flink.table.operations.JavaDataStreamQueryOperation; -import org.apache.flink.table.operations.JavaExternalQueryOperation; -import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.OutputConversionModifyOperation; -import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceValidation; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; -import org.apache.flink.table.typeutils.FieldInfoUtils; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; - -import java.lang.reflect.Method; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; /** * The implementation for a Java {@link StreamTableEnvironment}. This enables conversions from/to @@ -92,11 +66,9 @@ import java.util.stream.Collectors; * <p>It binds to a given {@link StreamExecutionEnvironment}. */ @Internal -public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl +public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnvironmentImpl implements StreamTableEnvironment { - private final StreamExecutionEnvironment executionEnvironment; - public StreamTableEnvironmentImpl( CatalogManager catalogManager, ModuleManager moduleManager, @@ -115,8 +87,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl functionCatalog, planner, isStreamingMode, - userClassLoader); - this.executionEnvironment = executionEnvironment; + userClassLoader, + executionEnvironment); } public static StreamTableEnvironment create( @@ -168,27 +140,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl classLoader); } - private static Executor lookupExecutor( - ClassLoader classLoader, - String executorIdentifier, - StreamExecutionEnvironment executionEnvironment) { - try { - final ExecutorFactory executorFactory = - FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, executorIdentifier); - final Method createMethod = - executorFactory - .getClass() - .getMethod("create", StreamExecutionEnvironment.class); - - return (Executor) createMethod.invoke(executorFactory, executionEnvironment); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the executor. Make sure a planner module is on the classpath", - e); - } - } - @Override public <T> void registerFunction(String name, TableFunction<T> tableFunction) { TypeInformation<T> typeInfo = @@ -263,65 +214,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl path, fromStreamInternal(dataStream, schema, path, ChangelogMode.insertOnly())); } - private <T> Table fromStreamInternal( - DataStream<T> dataStream, - @Nullable Schema schema, - @Nullable String viewPath, - ChangelogMode changelogMode) { - Preconditions.checkNotNull(dataStream, "Data stream must not be null."); - Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null."); - - if (dataStream.getExecutionEnvironment() != executionEnvironment) { - throw new ValidationException( - "The DataStream's StreamExecutionEnvironment must be identical to the one that " - + "has been passed to the StreamTableEnvironment during instantiation."); - } - - final CatalogManager catalogManager = getCatalogManager(); - final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); - final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); - - final UnresolvedIdentifier unresolvedIdentifier; - if (viewPath != null) { - unresolvedIdentifier = getParser().parseIdentifier(viewPath); - } else { - unresolvedIdentifier = - UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId()); - } - final ObjectIdentifier objectIdentifier = - catalogManager.qualifyIdentifier(unresolvedIdentifier); - - final SchemaTranslator.ConsumingResult schemaTranslationResult = - SchemaTranslator.createConsumingResult( - catalogManager.getDataTypeFactory(), dataStream.getType(), schema); - - final ResolvedSchema resolvedSchema = - schemaTranslationResult.getSchema().resolve(schemaResolver); - - final QueryOperation scanOperation = - new JavaExternalQueryOperation<>( - objectIdentifier, - dataStream, - schemaTranslationResult.getPhysicalDataType(), - schemaTranslationResult.isTopLevelRecord(), - changelogMode, - resolvedSchema); - - final List<String> projections = schemaTranslationResult.getProjections(); - if (projections == null) { - return createTable(scanOperation); - } - - final QueryOperation projectOperation = - operationTreeBuilder.project( - projections.stream() - .map(ApiExpressionUtils::unresolvedRef) - .collect(Collectors.toList()), - scanOperation); - - return createTable(projectOperation); - } - @Override public DataStream<Row> toDataStream(Table table) { Preconditions.checkNotNull(table, "Table must not be null."); @@ -391,71 +283,11 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl return toStreamInternal(table, schemaTranslationResult, changelogMode); } - private <T> DataStream<T> toStreamInternal( - Table table, - SchemaTranslator.ProducingResult schemaTranslationResult, - @Nullable ChangelogMode changelogMode) { - final CatalogManager catalogManager = getCatalogManager(); - final SchemaResolver schemaResolver = catalogManager.getSchemaResolver(); - final OperationTreeBuilder operationTreeBuilder = getOperationTreeBuilder(); - - final QueryOperation projectOperation = - schemaTranslationResult - .getProjections() - .map( - projections -> - operationTreeBuilder.project( - projections.stream() - .map(ApiExpressionUtils::unresolvedRef) - .collect(Collectors.toList()), - table.getQueryOperation())) - .orElseGet(table::getQueryOperation); - - final ResolvedSchema resolvedSchema = - schemaResolver.resolve(schemaTranslationResult.getSchema()); - - final UnresolvedIdentifier unresolvedIdentifier = - UnresolvedIdentifier.of( - "Unregistered_DataStream_Sink_" + ExternalModifyOperation.getUniqueId()); - final ObjectIdentifier objectIdentifier = - catalogManager.qualifyIdentifier(unresolvedIdentifier); - - final ExternalModifyOperation modifyOperation = - new ExternalModifyOperation( - objectIdentifier, - projectOperation, - resolvedSchema, - changelogMode, - schemaTranslationResult - .getPhysicalDataType() - .orElseGet(resolvedSchema::toPhysicalRowDataType)); - - return toStreamInternal(table, modifyOperation); - } - - private <T> DataStream<T> toStreamInternal(Table table, ModifyOperation modifyOperation) { - final List<Transformation<?>> transformations = - planner.translate(Collections.singletonList(modifyOperation)); - - final Transformation<T> transformation = getTransformation(table, transformations); - executionEnvironment.addOperator(transformation); - - // reconfigure whenever planner transformations are added - executionEnvironment.configure(tableConfig.getConfiguration()); - - return new DataStream<>(executionEnvironment, transformation); - } - @Override public StreamStatementSet createStatementSet() { return new StreamStatementSetImpl(this); } - void attachAsDataStream(List<ModifyOperation> modifyOperations) { - final List<Transformation<?>> transformations = translate(modifyOperations); - transformations.forEach(executionEnvironment::addOperator); - } - @Override public <T> Table fromDataStream(DataStream<T> dataStream, String fields) { List<Expression> expressions = ExpressionParser.INSTANCE.parseExpressionList(fields); @@ -464,10 +296,7 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl @Override public <T> Table fromDataStream(DataStream<T> dataStream, Expression... fields) { - JavaDataStreamQueryOperation<T> queryOperation = - asQueryOperation(dataStream, Optional.of(Arrays.asList(fields))); - - return createTable(queryOperation); + return createTable(asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)))); } @Override @@ -492,22 +321,6 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl } @Override - protected QueryOperation qualifyQueryOperation( - ObjectIdentifier identifier, QueryOperation queryOperation) { - if (queryOperation instanceof JavaDataStreamQueryOperation) { - JavaDataStreamQueryOperation<?> operation = - (JavaDataStreamQueryOperation) queryOperation; - return new JavaDataStreamQueryOperation<>( - identifier, - operation.getDataStream(), - operation.getFieldIndices(), - operation.getResolvedSchema()); - } else { - return queryOperation; - } - } - - @Override public <T> DataStream<T> toAppendStream(Table table, Class<T> clazz) { TypeInformation<T> typeInfo = extractTypeInformation(table, clazz); return toAppendStream(table, typeInfo); @@ -540,82 +353,9 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl return toStreamInternal(table, modifyOperation); } - /** - * This is a temporary workaround for Python API. Python API should not use - * StreamExecutionEnvironment at all. - */ - @Internal - public StreamExecutionEnvironment execEnv() { - return executionEnvironment; - } - @Override protected void validateTableSource(TableSource<?> tableSource) { super.validateTableSource(tableSource); validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource)); } - - private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) { - try { - return TypeExtractor.createTypeInfo(clazz); - } catch (Exception ex) { - throw new ValidationException( - String.format( - "Could not convert query: %s to a DataStream of class %s", - table.getQueryOperation().asSummaryString(), clazz.getSimpleName()), - ex); - } - } - - @SuppressWarnings("unchecked") - private <T> Transformation<T> getTransformation( - Table table, List<Transformation<?>> transformations) { - if (transformations.size() != 1) { - throw new TableException( - String.format( - "Expected a single transformation for query: %s\n Got: %s", - table.getQueryOperation().asSummaryString(), transformations)); - } - - return (Transformation<T>) transformations.get(0); - } - - private <T> DataType wrapWithChangeFlag(TypeInformation<T> outputType) { - TupleTypeInfo tupleTypeInfo = - new TupleTypeInfo<Tuple2<Boolean, T>>(Types.BOOLEAN(), outputType); - return TypeConversions.fromLegacyInfoToDataType(tupleTypeInfo); - } - - private <T> JavaDataStreamQueryOperation<T> asQueryOperation( - DataStream<T> dataStream, Optional<List<Expression>> fields) { - TypeInformation<T> streamType = dataStream.getType(); - - // get field names and types for all non-replaced fields - FieldInfoUtils.TypeInfoSchema typeInfoSchema = - fields.map( - f -> { - FieldInfoUtils.TypeInfoSchema fieldsInfo = - FieldInfoUtils.getFieldsInfo( - streamType, f.toArray(new Expression[0])); - - // check if event-time is enabled - validateTimeCharacteristic(fieldsInfo.isRowtimeDefined()); - return fieldsInfo; - }) - .orElseGet(() -> FieldInfoUtils.getFieldsInfo(streamType)); - - return new JavaDataStreamQueryOperation<>( - dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toResolvedSchema()); - } - - private void validateTimeCharacteristic(boolean isRowtimeDefined) { - if (isRowtimeDefined - && executionEnvironment.getStreamTimeCharacteristic() - != TimeCharacteristic.EventTime) { - throw new ValidationException( - String.format( - "A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s", - executionEnvironment.getStreamTimeCharacteristic())); - } - } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaDataStreamQueryOperation.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaDataStreamQueryOperation.java deleted file mode 100644 index a2fe8dd..0000000 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaDataStreamQueryOperation.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.operations; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.expressions.Expression; - -import javax.annotation.Nullable; - -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -/** - * Describes a relational operation that reads from a {@link DataStream}. - * - * <p>This operation may expose only part, or change the order of the fields available in a {@link - * org.apache.flink.api.common.typeutils.CompositeType} of the underlying {@link DataStream}. The - * {@link JavaDataStreamQueryOperation#getFieldIndices()} describes the mapping between fields of - * the {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. - */ -@Internal -public class JavaDataStreamQueryOperation<E> implements QueryOperation { - - /** - * The table identifier registered under the environment. The identifier might be null when the - * it is from {@link StreamTableEnvironment#fromDataStream(DataStream, Expression...)}. But the - * identifier should be not null if is from {@link - * StreamTableEnvironment#createTemporaryView(String, DataStream)} with a registered name. - */ - @Nullable private final ObjectIdentifier identifier; - - private final DataStream<E> dataStream; - private final int[] fieldIndices; - private final ResolvedSchema resolvedSchema; - - public JavaDataStreamQueryOperation( - DataStream<E> dataStream, int[] fieldIndices, ResolvedSchema resolvedSchema) { - this(null, dataStream, fieldIndices, resolvedSchema); - } - - public JavaDataStreamQueryOperation( - ObjectIdentifier identifier, - DataStream<E> dataStream, - int[] fieldIndices, - ResolvedSchema resolvedSchema) { - this.identifier = identifier; - this.dataStream = dataStream; - this.resolvedSchema = resolvedSchema; - this.fieldIndices = fieldIndices; - } - - public DataStream<E> getDataStream() { - return dataStream; - } - - public Optional<ObjectIdentifier> getIdentifier() { - return Optional.ofNullable(identifier); - } - - public int[] getFieldIndices() { - return fieldIndices; - } - - @Override - public ResolvedSchema getResolvedSchema() { - return resolvedSchema; - } - - @Override - public String asSummaryString() { - Map<String, Object> args = new LinkedHashMap<>(); - if (identifier != null) { - args.put("id", identifier.asSummaryString()); - } else { - args.put("id", dataStream.getId()); - } - args.put("fields", resolvedSchema.getColumnNames()); - - return OperationUtils.formatWithChildren( - "DataStream", args, getChildren(), Operation::asSummaryString); - } - - @Override - public List<QueryOperation> getChildren() { - return Collections.emptyList(); - } - - @Override - public <T> T accept(QueryOperationVisitor<T> visitor) { - return visitor.visit(this); - } -} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 4c5a8a8..e163cd7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -1844,7 +1844,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } - protected TableImpl createTable(QueryOperation tableOperation) { + @VisibleForTesting + public TableImpl createTable(QueryOperation tableOperation) { return TableImpl.createTable( this, tableOperation, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java index 585b930..f1ce1963 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java @@ -20,7 +20,6 @@ package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.factories.Factory; /** @@ -32,14 +31,8 @@ import org.apache.flink.table.factories.Factory; * <p>Usually, there should only be one executor factory in the class path. However, advanced users * can implement a custom one for hooking into the submission process. * - * <p><b>Important:</b> Implementations of this interface should also implement the method - * - * <pre> - * public Executor create(Configuration, StreamExecutionEnvironment); - * </pre> - * - * <p>This method will be used when instantiating a {@link TableEnvironment} from one of the - * bridging modules which enables conversion from/to {@code DataStream} API. + * <p><b>Important:</b> In order to support DataStream APIs, implementations of this interface must + * also implement {@code StreamExecutorFactory} from the {@code flink-table-api-bridge-base} module. */ @Internal public interface ExecutorFactory extends Factory { diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml b/flink-table/flink-table-api-scala-bridge/pom.xml index 72c9900..9dd3d22 100644 --- a/flink-table/flink-table-api-scala-bridge/pom.xml +++ b/flink-table/flink-table-api-scala-bridge/pom.xml @@ -44,6 +44,12 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-bridge-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> diff --git a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaExternalQueryOperation.java b/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaExternalQueryOperation.java deleted file mode 100644 index 15628e4..0000000 --- a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaExternalQueryOperation.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.operations; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.types.DataType; - -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * Describes a relational operation that reads from a {@link DataStream}. - * - * <p>It contains all information necessary to perform a stream-to-table conversion. - * - * <p>This class needs to be kept in sync with {@code JavaExternalQueryOperation} in the Java - * bridging module. - * - * @param <E> External type of data stream - */ -@Internal -public final class ScalaExternalQueryOperation<E> implements QueryOperation { - - private final ObjectIdentifier identifier; - - private final DataStream<E> dataStream; - - private final DataType physicalDataType; - - private final boolean isTopLevelRecord; - - private final ChangelogMode changelogMode; - - private final ResolvedSchema resolvedSchema; - - public ScalaExternalQueryOperation( - ObjectIdentifier identifier, - DataStream<E> dataStream, - DataType physicalDataType, - boolean isTopLevelRecord, - ChangelogMode changelogMode, - ResolvedSchema resolvedSchema) { - this.identifier = identifier; - this.dataStream = dataStream; - this.physicalDataType = physicalDataType; - this.isTopLevelRecord = isTopLevelRecord; - this.changelogMode = changelogMode; - this.resolvedSchema = resolvedSchema; - } - - public ObjectIdentifier getIdentifier() { - return identifier; - } - - public DataStream<E> getDataStream() { - return dataStream; - } - - public DataType getPhysicalDataType() { - return physicalDataType; - } - - public boolean isTopLevelRecord() { - return isTopLevelRecord; - } - - public ChangelogMode getChangelogMode() { - return changelogMode; - } - - @Override - public String asSummaryString() { - Map<String, Object> args = new LinkedHashMap<>(); - args.put("identifier", identifier); - args.put("stream", dataStream.getId()); - args.put("type", physicalDataType); - args.put("isTopLevelRecord", isTopLevelRecord); - args.put("changelogMode", changelogMode); - args.put("fields", resolvedSchema.getColumnNames()); - - return OperationUtils.formatWithChildren( - "DataStreamInput", args, getChildren(), Operation::asSummaryString); - } - - @Override - public List<QueryOperation> getChildren() { - return Collections.emptyList(); - } - - @Override - public ResolvedSchema getResolvedSchema() { - return resolvedSchema; - } - - @Override - public <T> T accept(QueryOperationVisitor<T> visitor) { - return visitor.visit(this); - } -} diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 8fd5e28..8bbc63c 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -17,36 +17,29 @@ */ package org.apache.flink.table.api.bridge.scala.internal -import java.util -import java.util.{Collections, List => JList} -import javax.annotation.Nullable import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.dag.Transformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} -import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api._ -import org.apache.flink.table.api.bridge.scala.StreamStatementSet -import org.apache.flink.table.api.internal.TableEnvironmentImpl -import org.apache.flink.table.catalog.SchemaTranslator.ProducingResult +import org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl +import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment} import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.ChangelogMode -import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner} -import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression} -import org.apache.flink.table.factories.{FactoryUtil, PlannerFactoryUtil} +import org.apache.flink.table.delegation.{Executor, Planner} +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.factories.PlannerFactoryUtil import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedFunctionHelper} import org.apache.flink.table.module.ModuleManager import org.apache.flink.table.operations._ import org.apache.flink.table.sources.{TableSource, TableSourceValidation} import org.apache.flink.table.types.AbstractDataType import org.apache.flink.table.types.utils.TypeConversions -import org.apache.flink.table.typeutils.FieldInfoUtils import org.apache.flink.types.Row import org.apache.flink.util.Preconditions +import java.util.Optional import scala.collection.JavaConverters._ /** @@ -64,7 +57,7 @@ class StreamTableEnvironmentImpl ( executor: Executor, isStreaming: Boolean, userClassLoader: ClassLoader) - extends TableEnvironmentImpl( + extends AbstractStreamTableEnvironmentImpl( catalogManager, moduleManager, config, @@ -72,8 +65,9 @@ class StreamTableEnvironmentImpl ( functionCatalog, planner, isStreaming, - userClassLoader) - with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment { + userClassLoader, + scalaExecutionEnvironment.getWrappedStreamExecutionEnvironment) + with StreamTableEnvironment { override def fromDataStream[T](dataStream: DataStream[T]): Table = { Preconditions.checkNotNull(dataStream, "Data stream must not be null.") @@ -127,62 +121,6 @@ class StreamTableEnvironmentImpl ( fromStreamInternal(dataStream.javaStream, schema, path, ChangelogMode.insertOnly())) } - private def fromStreamInternal[T]( - dataStream: JDataStream[T], - @Nullable schema: Schema, - @Nullable viewPath: String, - changelogMode: ChangelogMode): Table = { - Preconditions.checkNotNull(changelogMode, "Changelog mode must not be null.") - - if (dataStream.getExecutionEnvironment ne scalaExecutionEnvironment.getJavaEnv) { - throw new ValidationException( - "The DataStream's StreamExecutionEnvironment must be identical to the one that " + - "has been passed to the StreamTableEnvironment during instantiation.") - } - - val catalogManager = getCatalogManager - val schemaResolver = catalogManager.getSchemaResolver - val operationTreeBuilder = getOperationTreeBuilder - - val unresolvedIdentifier = if (viewPath != null) { - getParser.parseIdentifier(viewPath) - } else { - UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + dataStream.getId) - } - val objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier) - - val schemaTranslationResult = - SchemaTranslator.createConsumingResult( - catalogManager.getDataTypeFactory, dataStream.getType, schema) - - val resolvedSchema = schemaTranslationResult.getSchema.resolve(schemaResolver) - - val scanOperation = - new ScalaExternalQueryOperation( - objectIdentifier, - dataStream, - schemaTranslationResult.getPhysicalDataType, - schemaTranslationResult.isTopLevelRecord, - changelogMode, - resolvedSchema) - - val projections = schemaTranslationResult.getProjections - if (projections == null) { - return createTable(scanOperation) - } - - val projectOperation = - operationTreeBuilder.project( - util.Arrays.asList( - projections - .asScala - .map(ApiExpressionUtils.unresolvedRef) - .toArray), - scanOperation) - - createTable(projectOperation) - } - override def toDataStream(table: Table): DataStream[Row] = { Preconditions.checkNotNull(table, "Table must not be null.") // include all columns of the query (incl. metadata and computed columns) @@ -210,7 +148,7 @@ class StreamTableEnvironmentImpl ( table.getResolvedSchema, targetDataType) - toStreamInternal(table, schemaTranslationResult, ChangelogMode.insertOnly()) + new DataStream[T](toStreamInternal(table, schemaTranslationResult, ChangelogMode.insertOnly())) } override def toChangelogStream(table: Table): DataStream[Row] = { @@ -220,7 +158,7 @@ class StreamTableEnvironmentImpl ( table.getResolvedSchema, null) - toStreamInternal(table, schemaTranslationResult, null) + new DataStream[Row](toStreamInternal(table, schemaTranslationResult, null)) } override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = { @@ -231,7 +169,7 @@ class StreamTableEnvironmentImpl ( table.getResolvedSchema, targetSchema) - toStreamInternal(table, schemaTranslationResult, null) + new DataStream[Row](toStreamInternal(table, schemaTranslationResult, null)) } override def toChangelogStream( @@ -247,75 +185,15 @@ class StreamTableEnvironmentImpl ( table.getResolvedSchema, targetSchema) - toStreamInternal(table, schemaTranslationResult, changelogMode) - } - - private def toStreamInternal[T]( - table: Table, - schemaTranslationResult: ProducingResult, - @Nullable changelogMode: ChangelogMode): DataStream[T] = { - val catalogManager = getCatalogManager - val schemaResolver = catalogManager.getSchemaResolver - val operationTreeBuilder = getOperationTreeBuilder - - val optionalProjections = schemaTranslationResult.getProjections - val projectOperation = if (optionalProjections.isPresent) { - val projections = optionalProjections.get - operationTreeBuilder.project( - projections.asScala - .map(ApiExpressionUtils.unresolvedRef) - .map(_.asInstanceOf[Expression]) - .asJava, - table.getQueryOperation) - } else { - table.getQueryOperation - } - - val resolvedSchema = schemaResolver.resolve(schemaTranslationResult.getSchema) - - val unresolvedIdentifier = - UnresolvedIdentifier.of("Unregistered_DataStream_Sink_" + ExternalModifyOperation.getUniqueId) - val objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier) - - val modifyOperation = new ExternalModifyOperation( - objectIdentifier, - projectOperation, - resolvedSchema, - changelogMode, - schemaTranslationResult.getPhysicalDataType - .orElse(resolvedSchema.toPhysicalRowDataType)) - - toStreamInternal(table, modifyOperation) - } - - private def toStreamInternal[T]( - table: Table, - modifyOperation: ModifyOperation) - : DataStream[T] = { - val javaExecutionEnvironment = scalaExecutionEnvironment.getJavaEnv - - val transformations = planner.translate(Collections.singletonList(modifyOperation)) - val streamTransformation: Transformation[T] = getTransformation(table, transformations) - javaExecutionEnvironment.addOperator(streamTransformation) - - // reconfigure whenever planner transformations are added - javaExecutionEnvironment.configure(tableConfig.getConfiguration) - - new DataStream[T](new JDataStream[T](javaExecutionEnvironment, streamTransformation)) + new DataStream[Row](toStreamInternal(table, schemaTranslationResult, changelogMode)) } override def createStatementSet(): StreamStatementSet = { new StreamStatementSetImpl(this) } - private[internal] def attachAsDataStream(modifyOperations: JList[ModifyOperation]) { - val javaEnv = scalaExecutionEnvironment.getJavaEnv - val transformations = translate(modifyOperations).asScala - transformations.foreach(javaEnv.addOperator) - } - override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = { - val queryOperation = asQueryOperation(dataStream, Some(fields.toList.asJava)) + val queryOperation = asQueryOperation(dataStream.javaStream, Optional.of(fields.toList.asJava)) createTable(queryOperation) } @@ -338,7 +216,7 @@ class StreamTableEnvironmentImpl ( table.getQueryOperation, TypeConversions.fromLegacyInfoToDataType(returnType), OutputConversionModifyOperation.UpdateMode.APPEND) - toStreamInternal[T](table, modifyOperation) + new DataStream[T](toStreamInternal[T](table, modifyOperation)) } override def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, T)] = { @@ -348,7 +226,7 @@ class StreamTableEnvironmentImpl ( table.getQueryOperation, TypeConversions.fromLegacyInfoToDataType(returnType), OutputConversionModifyOperation.UpdateMode.RETRACT) - toStreamInternal(table, modifyOperation) + new DataStream[(Boolean, T)](toStreamInternal(table, modifyOperation)) } override def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = { @@ -404,57 +282,6 @@ class StreamTableEnvironmentImpl ( } } - private def getTransformation[T]( - table: Table, - transformations: util.List[Transformation[_]]) - : Transformation[T] = { - if (transformations.size != 1) { - throw new TableException(String - .format( - "Expected a single transformation for query: %s\n Got: %s", - table.getQueryOperation.asSummaryString, - transformations)) - } - transformations.get(0).asInstanceOf[Transformation[T]] - } - - private def asQueryOperation[T]( - dataStream: DataStream[T], - fields: Option[util.List[Expression]]) = { - val streamType = dataStream.javaStream.getType - // get field names and types for all non-replaced fields - val typeInfoSchema = fields.map((f: JList[Expression]) => { - val fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, f.toArray(new Array[Expression](0))) - // check if event-time is enabled - if (fieldsInfo.isRowtimeDefined && - (scalaExecutionEnvironment.getStreamTimeCharacteristic ne TimeCharacteristic.EventTime)) { - throw new ValidationException(String.format( - "A rowtime attribute requires an EventTime time characteristic in stream " + - "environment. But is: %s", - scalaExecutionEnvironment.getStreamTimeCharacteristic)) - } - fieldsInfo - }).getOrElse(FieldInfoUtils.getFieldsInfo(streamType)) - new ScalaDataStreamQueryOperation[T]( - dataStream.javaStream, - typeInfoSchema.getIndices, - typeInfoSchema.toResolvedSchema) - } - - override protected def qualifyQueryOperation( - identifier: ObjectIdentifier, - queryOperation: QueryOperation): QueryOperation = queryOperation match { - case qo: ScalaDataStreamQueryOperation[Any] => - new ScalaDataStreamQueryOperation[Any]( - identifier, - qo.getDataStream, - qo.getFieldIndices, - qo.getResolvedSchema - ) - case _ => - queryOperation - } - override def createTemporaryView[T]( path: String, dataStream: DataStream[T], @@ -489,7 +316,8 @@ object StreamTableEnvironmentImpl { val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager) - val executor = lookupExecutor(classLoader, settings.getExecutor, executionEnvironment) + val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor( + classLoader, settings.getExecutor, executionEnvironment.getWrappedStreamExecutionEnvironment) val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, executor, tableConfig, moduleManager, catalogManager, functionCatalog) @@ -506,27 +334,4 @@ object StreamTableEnvironmentImpl { classLoader ) } - - private def lookupExecutor( - classLoader: ClassLoader, - executorIdentifier: String, - executionEnvironment: StreamExecutionEnvironment) - :Executor = - try { - val executorFactory = - FactoryUtil.discoverFactory(classLoader, classOf[ExecutorFactory], executorIdentifier) - val createMethod = executorFactory - .getClass - .getMethod("create", classOf[JStreamExecutionEnvironment]) - - createMethod - .invoke( - executorFactory, - executionEnvironment.getWrappedStreamExecutionEnvironment) - .asInstanceOf[Executor] - } catch { - case e: Exception => - throw new TableException("Could not instantiate the executor. Make sure a planner module " + - "is on the classpath", e) - } } diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index c971579..8dd4dc9 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -71,7 +71,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId> + <artifactId>flink-table-api-bridge-base</artifactId> <version>${project.version}</version> </dependency> @@ -83,12 +83,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-sql-parser</artifactId> <version>${project.version}</version> <exclusions> @@ -121,14 +115,12 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${project.version}</version> - <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${project.version}</version> - <scope>provided</scope> </dependency> <dependency> @@ -242,6 +234,20 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <!-- SuccessException used in TestValuesTableFactory --> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java index 0ea9938..8a51dc7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.StreamExecutorFactory; import java.util.Collections; import java.util.Set; @@ -40,13 +41,14 @@ import java.util.Set; * #create(StreamExecutionEnvironment)}. */ @Internal -public final class DefaultExecutorFactory implements ExecutorFactory { +public final class DefaultExecutorFactory implements StreamExecutorFactory { @Override public Executor create(Configuration configuration) { return create(StreamExecutionEnvironment.getExecutionEnvironment(configuration)); } + @Override public Executor create(StreamExecutionEnvironment executionEnvironment) { return new DefaultExecutor(executionEnvironment); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DataStreamQueryOperation.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/InternalDataStreamQueryOperation.java similarity index 92% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DataStreamQueryOperation.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/InternalDataStreamQueryOperation.java index f35024a..1f58c51 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DataStreamQueryOperation.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/InternalDataStreamQueryOperation.java @@ -41,11 +41,11 @@ import java.util.Map; * * <p>This operation may expose only part, or change the order of the fields available in a {@link * org.apache.flink.api.common.typeutils.CompositeType} of the underlying {@link DataStream}. The - * {@link DataStreamQueryOperation#getFieldIndices()} describes the mapping between fields of the - * {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. + * {@link InternalDataStreamQueryOperation#getFieldIndices()} describes the mapping between fields + * of the {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. */ @Internal -public class DataStreamQueryOperation<E> implements QueryOperation { +public class InternalDataStreamQueryOperation<E> implements QueryOperation { private final ObjectIdentifier identifier; private final DataStream<E> dataStream; @@ -55,7 +55,7 @@ public class DataStreamQueryOperation<E> implements QueryOperation { private final boolean[] fieldNullables; private final FlinkStatistic statistic; - public DataStreamQueryOperation( + public InternalDataStreamQueryOperation( ObjectIdentifier identifier, DataStream<E> dataStream, int[] fieldIndices, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java index ed9d3ca..d1a28d8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java @@ -42,17 +42,15 @@ import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.operations.AggregateQueryOperation; import org.apache.flink.table.operations.CalculatedQueryOperation; import org.apache.flink.table.operations.CatalogQueryOperation; +import org.apache.flink.table.operations.DataStreamQueryOperation; import org.apache.flink.table.operations.DistinctQueryOperation; +import org.apache.flink.table.operations.ExternalQueryOperation; import org.apache.flink.table.operations.FilterQueryOperation; -import org.apache.flink.table.operations.JavaDataStreamQueryOperation; -import org.apache.flink.table.operations.JavaExternalQueryOperation; import org.apache.flink.table.operations.JoinQueryOperation; import org.apache.flink.table.operations.JoinQueryOperation.JoinType; import org.apache.flink.table.operations.ProjectQueryOperation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.QueryOperationVisitor; -import org.apache.flink.table.operations.ScalaDataStreamQueryOperation; -import org.apache.flink.table.operations.ScalaExternalQueryOperation; import org.apache.flink.table.operations.SetQueryOperation; import org.apache.flink.table.operations.SortQueryOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; @@ -75,7 +73,7 @@ import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor; import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; import org.apache.flink.table.planner.functions.utils.TableSqlFunction; -import org.apache.flink.table.planner.operations.DataStreamQueryOperation; +import org.apache.flink.table.planner.operations.InternalDataStreamQueryOperation; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.operations.RichTableSourceQueryOperation; import org.apache.flink.table.planner.plan.logical.LogicalWindow; @@ -431,21 +429,11 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod public RelNode visit(QueryOperation other) { if (other instanceof PlannerQueryOperation) { return ((PlannerQueryOperation) other).getCalciteTree(); - } else if (other instanceof DataStreamQueryOperation) { - return convertToDataStreamScan((DataStreamQueryOperation<?>) other); - } else if (other instanceof JavaExternalQueryOperation) { - final JavaExternalQueryOperation<?> externalQueryOperation = - (JavaExternalQueryOperation<?>) other; - return convertToExternalScan( - externalQueryOperation.getIdentifier(), - externalQueryOperation.getDataStream(), - externalQueryOperation.getPhysicalDataType(), - externalQueryOperation.isTopLevelRecord(), - externalQueryOperation.getChangelogMode(), - externalQueryOperation.getResolvedSchema()); - } else if (other instanceof ScalaExternalQueryOperation) { - final ScalaExternalQueryOperation<?> externalQueryOperation = - (ScalaExternalQueryOperation<?>) other; + } else if (other instanceof InternalDataStreamQueryOperation) { + return convertToDataStreamScan((InternalDataStreamQueryOperation<?>) other); + } else if (other instanceof ExternalQueryOperation) { + final ExternalQueryOperation<?> externalQueryOperation = + (ExternalQueryOperation<?>) other; return convertToExternalScan( externalQueryOperation.getIdentifier(), externalQueryOperation.getDataStream(), @@ -455,17 +443,9 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod externalQueryOperation.getResolvedSchema()); } // legacy - else if (other instanceof JavaDataStreamQueryOperation) { - JavaDataStreamQueryOperation<?> dataStreamQueryOperation = - (JavaDataStreamQueryOperation<?>) other; - return convertToDataStreamScan( - dataStreamQueryOperation.getDataStream(), - dataStreamQueryOperation.getFieldIndices(), - dataStreamQueryOperation.getResolvedSchema(), - dataStreamQueryOperation.getIdentifier()); - } else if (other instanceof ScalaDataStreamQueryOperation) { - ScalaDataStreamQueryOperation dataStreamQueryOperation = - (ScalaDataStreamQueryOperation<?>) other; + else if (other instanceof DataStreamQueryOperation) { + DataStreamQueryOperation<?> dataStreamQueryOperation = + (DataStreamQueryOperation<?>) other; return convertToDataStreamScan( dataStreamQueryOperation.getDataStream(), dataStreamQueryOperation.getFieldIndices(), @@ -551,7 +531,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod changelogMode); } - private RelNode convertToDataStreamScan(DataStreamQueryOperation<?> operation) { + private RelNode convertToDataStreamScan(InternalDataStreamQueryOperation<?> operation) { List<String> names; ObjectIdentifier identifier = operation.getIdentifier(); if (identifier != null) { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index ef39ea7..8fc6c4d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -17,19 +17,22 @@ */ package org.apache.flink.table.planner.utils +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.rel.RelNode +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier} import org.apache.flink.api.common.BatchShuffleMode import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.ExecutionOptions +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} import org.apache.flink.streaming.api.{TimeCharacteristic, environment} import org.apache.flink.table.api._ -import org.apache.flink.table.api.bridge.java.internal.{StreamTableEnvironmentImpl => JavaStreamTableEnvImpl} import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => JavaStreamTableEnv} -import org.apache.flink.table.api.bridge.scala.internal.{StreamTableEnvironmentImpl => ScalaStreamTableEnvImpl} import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableEnvironmentInternal, TableImpl} @@ -47,7 +50,7 @@ import org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOper import org.apache.flink.table.planner.calcite.CalciteConfig import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable -import org.apache.flink.table.planner.operations.{DataStreamQueryOperation, PlannerQueryOperation, RichTableSourceQueryOperation} +import org.apache.flink.table.planner.operations.{InternalDataStreamQueryOperation, PlannerQueryOperation, RichTableSourceQueryOperation} import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodePlanDumper @@ -64,13 +67,6 @@ import org.apache.flink.table.types.logical.LogicalType import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.typeutils.FieldInfoUtils import org.apache.flink.types.Row - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} - -import org.apache.calcite.avatica.util.TimeUnit -import org.apache.calcite.rel.RelNode -import org.apache.calcite.sql.parser.SqlParserPos -import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier} import org.junit.Assert.{assertEquals, assertTrue, fail} import org.junit.Rule import org.junit.rules.{ExpectedException, TemporaryFolder, TestName} @@ -80,7 +76,6 @@ import _root_.java.util import java.io.{File, IOException} import java.nio.file.{Files, Paths} import java.time.Duration - import _root_.scala.collection.JavaConversions._ import _root_.scala.io.Source @@ -1624,7 +1619,7 @@ object TableTestUtil { }).getOrElse(FieldInfoUtils.getFieldsInfo(streamType)) val fieldCnt = typeInfoSchema.getFieldTypes.length - val dataStreamQueryOperation = new DataStreamQueryOperation( + val dataStreamQueryOperation = new InternalDataStreamQueryOperation( ObjectIdentifier.of(tEnv.getCurrentCatalog, tEnv.getCurrentDatabase, name), dataStream, typeInfoSchema.getIndices, @@ -1632,22 +1627,10 @@ object TableTestUtil { fieldNullables.getOrElse(Array.fill(fieldCnt)(true)), statistic.getOrElse(FlinkStatistic.UNKNOWN) ) - val table = createTable(tEnv, dataStreamQueryOperation) + val table = tEnv.asInstanceOf[TableEnvironmentImpl].createTable(dataStreamQueryOperation) tEnv.registerTable(name, table) } - def createTable(tEnv: TableEnvironment, queryOperation: QueryOperation): Table = { - val createTableMethod = tEnv match { - case _: ScalaStreamTableEnvImpl | _: JavaStreamTableEnvImpl => - tEnv.getClass.getSuperclass.getDeclaredMethod("createTable", classOf[QueryOperation]) - case t: TableEnvironmentImpl => - t.getClass.getDeclaredMethod("createTable", classOf[QueryOperation]) - case _ => throw new TableException(s"Unsupported class: ${tEnv.getClass.getCanonicalName}") - } - createTableMethod.setAccessible(true) - createTableMethod.invoke(tEnv, queryOperation).asInstanceOf[Table] - } - def readFromResource(path: String): String = { val basePath = getClass.getResource("/").getFile val fullPath = if (path.startsWith("/")) { diff --git a/flink-table/flink-table-uber/pom.xml b/flink-table/flink-table-uber/pom.xml index c51f8d2..6b80c11 100644 --- a/flink-table/flink-table-uber/pom.xml +++ b/flink-table/flink-table-uber/pom.xml @@ -65,6 +65,11 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-bridge-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${project.version}</version> </dependency> @@ -110,6 +115,7 @@ under the License. <include>org.apache.flink:flink-table-common</include> <include>org.apache.flink:flink-table-api-java</include> <include>org.apache.flink:flink-table-api-scala_${scala.binary.version}</include> + <include>org.apache.flink:flink-table-api-bridge-base</include> <include>org.apache.flink:flink-table-api-java-bridge</include> <include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include> <include>org.apache.flink:flink-table-planner_${scala.binary.version}</include> diff --git a/flink-table/pom.xml b/flink-table/pom.xml index f2f29be..63f6371 100644 --- a/flink-table/pom.xml +++ b/flink-table/pom.xml @@ -36,6 +36,7 @@ under the License. <module>flink-table-common</module> <module>flink-table-api-java</module> <module>flink-table-api-scala</module> + <module>flink-table-api-bridge-base</module> <module>flink-table-api-java-bridge</module> <module>flink-table-api-scala-bridge</module> <module>flink-table-planner</module> diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh index e6a6b76..215a822 100755 --- a/tools/ci/stage.sh +++ b/tools/ci/stage.sh @@ -61,6 +61,7 @@ flink-table/flink-sql-parser-hive,\ flink-table/flink-table-common,\ flink-table/flink-table-api-java,\ flink-table/flink-table-api-scala,\ +flink-table/flink-table-api-bridge-base,\ flink-table/flink-table-api-java-bridge,\ flink-table/flink-table-api-scala-bridge,\ flink-table/flink-sql-client,\ @@ -94,6 +95,7 @@ flink-connectors/flink-connector-hbase-2.2,\ flink-connectors/flink-hcatalog,\ flink-connectors/flink-hadoop-compatibility,\ flink-connectors,\ +flink-connectors/flink-connector-files,\ flink-connectors/flink-connector-jdbc,\ flink-connectors/flink-connector-cassandra,\ flink-connectors/flink-connector-elasticsearch6,\