This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 55162dcc5cc [FLINK-33262][table-api] Extend source provider interfaces 
with the new parallelism provider interface
55162dcc5cc is described below

commit 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Sat Nov 4 21:00:25 2023 +0800

    [FLINK-33262][table-api] Extend source provider interfaces with the new 
parallelism provider interface
    
    Close apache/flink#23663
---
 .../connector/source/DataStreamScanProvider.java    |  4 +++-
 .../connector/source/SourceFunctionProvider.java    | 21 ++++++++++++++++++++-
 .../flink/table/connector/ParallelismProvider.java  | 12 ++++++------
 .../table/connector/source/InputFormatProvider.java | 21 ++++++++++++++++++++-
 .../table/connector/source/SourceProvider.java      | 18 +++++++++++++++++-
 .../apache/flink/table/factories/FactoryUtil.java   |  9 +++++++++
 .../connectors/TransformationScanProvider.java      |  4 +++-
 7 files changed, 78 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
index 7fc4687363f..213e3806327 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.data.RowData;
 
@@ -35,7 +36,8 @@ import org.apache.flink.table.data.RowData;
  * or {@link InputFormatProvider}.
  */
 @PublicEvolving
-public interface DataStreamScanProvider extends 
ScanTableSource.ScanRuntimeProvider {
+public interface DataStreamScanProvider
+        extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
 
     /**
      * Creates a scan Java {@link DataStream} from a {@link 
StreamExecutionEnvironment}.
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
index ff7238d8a2f..e5c35525e16 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
@@ -20,8 +20,13 @@ package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /**
  * Provider of a {@link SourceFunction} instance as a runtime implementation 
for {@link
  * ScanTableSource}.
@@ -32,10 +37,19 @@ import org.apache.flink.table.data.RowData;
  */
 @Deprecated
 @PublicEvolving
-public interface SourceFunctionProvider extends 
ScanTableSource.ScanRuntimeProvider {
+public interface SourceFunctionProvider
+        extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
 
     /** Helper method for creating a static provider. */
     static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction, 
boolean isBounded) {
+        return of(sourceFunction, isBounded, null);
+    }
+
+    /** Helper method for creating a Source provider with a provided source 
parallelism. */
+    static SourceFunctionProvider of(
+            SourceFunction<RowData> sourceFunction,
+            boolean isBounded,
+            @Nullable Integer sourceParallelism) {
         return new SourceFunctionProvider() {
             @Override
             public SourceFunction<RowData> createSourceFunction() {
@@ -46,6 +60,11 @@ public interface SourceFunctionProvider extends 
ScanTableSource.ScanRuntimeProvi
             public boolean isBounded() {
                 return isBounded;
             }
+
+            @Override
+            public Optional<Integer> getParallelism() {
+                return Optional.ofNullable(sourceParallelism);
+            }
         };
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
index f9c4684383a..27e4047a016 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
@@ -20,14 +20,13 @@ package org.apache.flink.table.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider;
+import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
 
 import java.util.Optional;
 
 /**
  * Parallelism provider for other connector providers. It allows to express a 
custom parallelism for
- * the connector runtime implementation. Otherwise the parallelism is 
determined by the planner.
- *
- * <p>Note: Currently, this interface only works with {@link 
SinkRuntimeProvider}.
+ * the connector runtime implementation. Otherwise, the parallelism is 
determined by the planner.
  */
 @PublicEvolving
 public interface ParallelismProvider {
@@ -38,9 +37,10 @@ public interface ParallelismProvider {
      * <p>The parallelism denotes how many parallel instances of a source or 
sink will be spawned
      * during the execution.
      *
-     * <p>Enforcing a different parallelism for sinks might mess up the 
changelog if the input is
-     * not {@link ChangelogMode#insertOnly()}. Therefore, a primary key is 
required by which the
-     * input will be shuffled before records enter the {@link 
SinkRuntimeProvider} implementation.
+     * <p>Enforcing a different parallelism for sources/sinks might mess up 
the changelog if the
+     * output/input is not {@link ChangelogMode#insertOnly()}. Therefore, a 
primary key is required
+     * by which the output/input will be shuffled after/before records 
leave/enter the {@link
+     * ScanRuntimeProvider}/{@link SinkRuntimeProvider} implementation.
      *
      * @return empty if the connector does not provide a custom parallelism, 
then the planner will
      *     decide the number of parallel instances by itself.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
index bbf20e1b193..a9775becc33 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
@@ -20,18 +20,32 @@ package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /**
  * Provider of an {@link InputFormat} instance as a runtime implementation for 
{@link
  * ScanTableSource}.
  */
 @PublicEvolving
-public interface InputFormatProvider extends 
ScanTableSource.ScanRuntimeProvider {
+public interface InputFormatProvider
+        extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
 
     /** Helper method for creating a static provider. */
     static InputFormatProvider of(InputFormat<RowData, ?> inputFormat) {
+        return of(inputFormat, null);
+    }
+
+    /** Helper method for creating a static provider with a provided source 
parallelism. */
+    static InputFormatProvider of(
+            InputFormat<RowData, ?> inputFormat, @Nullable Integer 
sourceParallelism) {
+
         return new InputFormatProvider() {
+
             @Override
             public InputFormat<RowData, ?> createInputFormat() {
                 return inputFormat;
@@ -41,6 +55,11 @@ public interface InputFormatProvider extends 
ScanTableSource.ScanRuntimeProvider
             public boolean isBounded() {
                 return true;
             }
+
+            @Override
+            public Optional<Integer> getParallelism() {
+                return Optional.ofNullable(sourceParallelism);
+            }
         };
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
index f0e85f88624..2d9f5143626 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
@@ -21,8 +21,13 @@ package org.apache.flink.table.connector.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
 /**
  * Provider of a {@link Source} instance as a runtime implementation for 
{@link ScanTableSource}.
  *
@@ -30,11 +35,17 @@ import org.apache.flink.table.data.RowData;
  * advanced connector developers.
  */
 @PublicEvolving
-public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider {
+public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider, 
ParallelismProvider {
 
     /** Helper method for creating a static provider. */
     static SourceProvider of(Source<RowData, ?, ?> source) {
+        return of(source, null);
+    }
+
+    /** Helper method for creating a Source provider with a provided source 
parallelism. */
+    static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer 
sourceParallelism) {
         return new SourceProvider() {
+
             @Override
             public Source<RowData, ?, ?> createSource() {
                 return source;
@@ -44,6 +55,11 @@ public interface SourceProvider extends 
ScanTableSource.ScanRuntimeProvider {
             public boolean isBounded() {
                 return Boundedness.BOUNDED.equals(source.getBoundedness());
             }
+
+            @Override
+            public Optional<Integer> getParallelism() {
+                return Optional.ofNullable(sourceParallelism);
+            }
         };
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 8f103be0c7e..d8d6d7e9000 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -120,6 +120,15 @@ public final class FactoryUtil {
                     .defaultValues("rest")
                     .withDescription("Specify the endpoints that are used.");
 
+    public static final ConfigOption<Integer> SOURCE_PARALLELISM =
+            ConfigOptions.key("scan.parallelism")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines a custom parallelism for the source. "
+                                    + "By default, if this option is not 
defined, the planner will derive the parallelism "
+                                    + "for each statement individually by also 
considering the global configuration.");
+
     public static final ConfigOption<WatermarkEmitStrategy> 
WATERMARK_EMIT_STRATEGY =
             ConfigOptions.key("scan.watermark.emit.strategy")
                     .enumType(WatermarkEmitStrategy.class)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
index e6642bc2ab1..46e739ec54d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.connectors;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -37,7 +38,8 @@ import org.apache.flink.table.data.RowData;
  * SourceFunctionProvider}, or {@link SourceProvider}.
  */
 @Internal
-public interface TransformationScanProvider extends 
ScanTableSource.ScanRuntimeProvider {
+public interface TransformationScanProvider
+        extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
 
     /**
      * Creates a {@link Transformation} instance.

Reply via email to