This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3cda01cac737c97a104feae0e35da1eb91a8751
Author: fengli <ldliu...@163.com>
AuthorDate: Wed Apr 24 18:14:53 2024 +0800

    [FLINK-35189][connectors/filesystem] Modify the visibility of filesystem 
connector related methods to protected
---
 .../apache/flink/connector/file/src/AbstractFileSource.java   | 11 ++++++++---
 .../flink/connector/file/table/FileSystemTableFactory.java    |  4 ++--
 .../flink/connector/file/table/FileSystemTableSource.java     |  4 ++--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index f4fb463e10e..b14d46b3f9c 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -147,7 +147,7 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
             throw new FlinkRuntimeException("Could not enumerate file splits", 
e);
         }
 
-        return createSplitEnumerator(enumContext, enumerator, splits, null);
+        return createSplitEnumerator(getBoundedness(), enumContext, 
enumerator, splits, null);
     }
 
     @Override
@@ -164,7 +164,11 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
                 (Collection<FileSourceSplit>) checkpoint.getSplits();
 
         return createSplitEnumerator(
-                enumContext, enumerator, splits, 
checkpoint.getAlreadyProcessedPaths());
+                getBoundedness(),
+                enumContext,
+                enumerator,
+                splits,
+                checkpoint.getAlreadyProcessedPaths());
     }
 
     @Override
@@ -186,6 +190,7 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
     // ------------------------------------------------------------------------
 
     private SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> 
createSplitEnumerator(
+            Boundedness boundedness,
             SplitEnumeratorContext<SplitT> context,
             FileEnumerator enumerator,
             Collection<FileSourceSplit> splits,
@@ -199,7 +204,7 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
 
         final FileSplitAssigner splitAssigner = assignerFactory.create(splits);
 
-        if (continuousEnumerationSettings == null) {
+        if (Boundedness.BOUNDED == boundedness) {
             // bounded case
             return castGeneric(new StaticFileSplitEnumerator(fileSplitContext, 
splitAssigner));
         } else {
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index 3f0f415b863..16593ffd3e7 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -149,7 +149,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
                 .collect(Collectors.toSet());
     }
 
-    private void validate(FactoryUtil.TableFactoryHelper helper) {
+    protected void validate(FactoryUtil.TableFactoryHelper helper) {
         // Except format options, some formats like parquet and orc can not 
list all supported
         // options.
         helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".");
@@ -160,7 +160,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
                         
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
     }
 
-    private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
+    protected <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
             Context context, Class<F> formatFactoryClass) {
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         if (formatFactoryExists(context, formatFactoryClass)) {
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 1e53eb53cfb..bbaa2a310d5 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -264,7 +264,7 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
         return bulkFormat;
     }
 
-    private SourceProvider createSourceProvider(BulkFormat<RowData, 
FileSourceSplit> bulkFormat) {
+    protected SourceProvider createSourceProvider(BulkFormat<RowData, 
FileSourceSplit> bulkFormat) {
         final FileSource.FileSourceBuilder<RowData> fileSourceBuilder =
                 FileSource.forBulkFileFormat(bulkFormat, paths());
 
@@ -287,7 +287,7 @@ public class FileSystemTableSource extends 
AbstractFileSystemTable
         return SourceProvider.of(fileSourceBuilder.build());
     }
 
-    private Path[] paths() {
+    protected Path[] paths() {
         if (partitionKeys.isEmpty()) {
             return new Path[] {path};
         } else {

Reply via email to