This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 55162dcc5cc [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface 55162dcc5cc is described below commit 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202 Author: Zhanghao Chen <m...@outlook.com> AuthorDate: Sat Nov 4 21:00:25 2023 +0800 [FLINK-33262][table-api] Extend source provider interfaces with the new parallelism provider interface Close apache/flink#23663 --- .../connector/source/DataStreamScanProvider.java | 4 +++- .../connector/source/SourceFunctionProvider.java | 21 ++++++++++++++++++++- .../flink/table/connector/ParallelismProvider.java | 12 ++++++------ .../table/connector/source/InputFormatProvider.java | 21 ++++++++++++++++++++- .../table/connector/source/SourceProvider.java | 18 +++++++++++++++++- .../apache/flink/table/factories/FactoryUtil.java | 9 +++++++++ .../connectors/TransformationScanProvider.java | 4 +++- 7 files changed, 78 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java index 7fc4687363f..213e3806327 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.data.RowData; @@ -35,7 +36,8 @@ import org.apache.flink.table.data.RowData; * or {@link InputFormatProvider}. */ @PublicEvolving -public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvider { +public interface DataStreamScanProvider + extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** * Creates a scan Java {@link DataStream} from a {@link StreamExecutionEnvironment}. diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java index ff7238d8a2f..e5c35525e16 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java @@ -20,8 +20,13 @@ package org.apache.flink.table.connector.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + +import java.util.Optional; + /** * Provider of a {@link SourceFunction} instance as a runtime implementation for {@link * ScanTableSource}. @@ -32,10 +37,19 @@ import org.apache.flink.table.data.RowData; */ @Deprecated @PublicEvolving -public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvider { +public interface SourceFunctionProvider + extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** Helper method for creating a static provider. */ static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, boolean isBounded) { + return of(sourceFunction, isBounded, null); + } + + /** Helper method for creating a Source provider with a provided source parallelism. */ + static SourceFunctionProvider of( + SourceFunction<RowData> sourceFunction, + boolean isBounded, + @Nullable Integer sourceParallelism) { return new SourceFunctionProvider() { @Override public SourceFunction<RowData> createSourceFunction() { @@ -46,6 +60,11 @@ public interface SourceFunctionProvider extends ScanTableSource.ScanRuntimeProvi public boolean isBounded() { return isBounded; } + + @Override + public Optional<Integer> getParallelism() { + return Optional.ofNullable(sourceParallelism); + } }; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java index f9c4684383a..27e4047a016 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java @@ -20,14 +20,13 @@ package org.apache.flink.table.connector; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider; +import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider; import java.util.Optional; /** * Parallelism provider for other connector providers. It allows to express a custom parallelism for - * the connector runtime implementation. Otherwise the parallelism is determined by the planner. - * - * <p>Note: Currently, this interface only works with {@link SinkRuntimeProvider}. + * the connector runtime implementation. Otherwise, the parallelism is determined by the planner. */ @PublicEvolving public interface ParallelismProvider { @@ -38,9 +37,10 @@ public interface ParallelismProvider { * <p>The parallelism denotes how many parallel instances of a source or sink will be spawned * during the execution. * - * <p>Enforcing a different parallelism for sinks might mess up the changelog if the input is - * not {@link ChangelogMode#insertOnly()}. Therefore, a primary key is required by which the - * input will be shuffled before records enter the {@link SinkRuntimeProvider} implementation. + * <p>Enforcing a different parallelism for sources/sinks might mess up the changelog if the + * output/input is not {@link ChangelogMode#insertOnly()}. Therefore, a primary key is required + * by which the output/input will be shuffled after/before records leave/enter the {@link + * ScanRuntimeProvider}/{@link SinkRuntimeProvider} implementation. * * @return empty if the connector does not provide a custom parallelism, then the planner will * decide the number of parallel instances by itself. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java index bbf20e1b193..a9775becc33 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java @@ -20,18 +20,32 @@ package org.apache.flink.table.connector.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + +import java.util.Optional; + /** * Provider of an {@link InputFormat} instance as a runtime implementation for {@link * ScanTableSource}. */ @PublicEvolving -public interface InputFormatProvider extends ScanTableSource.ScanRuntimeProvider { +public interface InputFormatProvider + extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** Helper method for creating a static provider. */ static InputFormatProvider of(InputFormat<RowData, ?> inputFormat) { + return of(inputFormat, null); + } + + /** Helper method for creating a static provider with a provided source parallelism. */ + static InputFormatProvider of( + InputFormat<RowData, ?> inputFormat, @Nullable Integer sourceParallelism) { + return new InputFormatProvider() { + @Override public InputFormat<RowData, ?> createInputFormat() { return inputFormat; @@ -41,6 +55,11 @@ public interface InputFormatProvider extends ScanTableSource.ScanRuntimeProvider public boolean isBounded() { return true; } + + @Override + public Optional<Integer> getParallelism() { + return Optional.ofNullable(sourceParallelism); + } }; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java index f0e85f88624..2d9f5143626 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java @@ -21,8 +21,13 @@ package org.apache.flink.table.connector.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.data.RowData; +import javax.annotation.Nullable; + +import java.util.Optional; + /** * Provider of a {@link Source} instance as a runtime implementation for {@link ScanTableSource}. * @@ -30,11 +35,17 @@ import org.apache.flink.table.data.RowData; * advanced connector developers. */ @PublicEvolving -public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider { +public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** Helper method for creating a static provider. */ static SourceProvider of(Source<RowData, ?, ?> source) { + return of(source, null); + } + + /** Helper method for creating a Source provider with a provided source parallelism. */ + static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer sourceParallelism) { return new SourceProvider() { + @Override public Source<RowData, ?, ?> createSource() { return source; @@ -44,6 +55,11 @@ public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider { public boolean isBounded() { return Boundedness.BOUNDED.equals(source.getBoundedness()); } + + @Override + public Optional<Integer> getParallelism() { + return Optional.ofNullable(sourceParallelism); + } }; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 8f103be0c7e..d8d6d7e9000 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -120,6 +120,15 @@ public final class FactoryUtil { .defaultValues("rest") .withDescription("Specify the endpoints that are used."); + public static final ConfigOption<Integer> SOURCE_PARALLELISM = + ConfigOptions.key("scan.parallelism") + .intType() + .noDefaultValue() + .withDescription( + "Defines a custom parallelism for the source. " + + "By default, if this option is not defined, the planner will derive the parallelism " + + "for each statement individually by also considering the global configuration."); + public static final ConfigOption<WatermarkEmitStrategy> WATERMARK_EMIT_STRATEGY = ConfigOptions.key("scan.watermark.emit.strategy") .enumType(WatermarkEmitStrategy.class) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java index e6642bc2ab1..46e739ec54d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.connectors; import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.api.CompiledPlan; +import org.apache.flink.table.connector.ParallelismProvider; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.ScanTableSource; @@ -37,7 +38,8 @@ import org.apache.flink.table.data.RowData; * SourceFunctionProvider}, or {@link SourceProvider}. */ @Internal -public interface TransformationScanProvider extends ScanTableSource.ScanRuntimeProvider { +public interface TransformationScanProvider + extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider { /** * Creates a {@link Transformation} instance.