This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3cda01cac737c97a104feae0e35da1eb91a8751 Author: fengli <ldliu...@163.com> AuthorDate: Wed Apr 24 18:14:53 2024 +0800 [FLINK-35189][connectors/filesystem] Modify the visibility of filesystem connector related methods to protected --- .../apache/flink/connector/file/src/AbstractFileSource.java | 11 ++++++++--- .../flink/connector/file/table/FileSystemTableFactory.java | 4 ++-- .../flink/connector/file/table/FileSystemTableSource.java | 4 ++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java index f4fb463e10e..b14d46b3f9c 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java @@ -147,7 +147,7 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> throw new FlinkRuntimeException("Could not enumerate file splits", e); } - return createSplitEnumerator(enumContext, enumerator, splits, null); + return createSplitEnumerator(getBoundedness(), enumContext, enumerator, splits, null); } @Override @@ -164,7 +164,11 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> (Collection<FileSourceSplit>) checkpoint.getSplits(); return createSplitEnumerator( - enumContext, enumerator, splits, checkpoint.getAlreadyProcessedPaths()); + getBoundedness(), + enumContext, + enumerator, + splits, + checkpoint.getAlreadyProcessedPaths()); } @Override @@ -186,6 +190,7 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> // ------------------------------------------------------------------------ private SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createSplitEnumerator( + Boundedness boundedness, SplitEnumeratorContext<SplitT> context, FileEnumerator enumerator, Collection<FileSourceSplit> splits, @@ -199,7 +204,7 @@ public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit> final FileSplitAssigner splitAssigner = assignerFactory.create(splits); - if (continuousEnumerationSettings == null) { + if (Boundedness.BOUNDED == boundedness) { // bounded case return castGeneric(new StaticFileSplitEnumerator(fileSplitContext, splitAssigner)); } else { diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java index 3f0f415b863..16593ffd3e7 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java @@ -149,7 +149,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami .collect(Collectors.toSet()); } - private void validate(FactoryUtil.TableFactoryHelper helper) { + protected void validate(FactoryUtil.TableFactoryHelper helper) { // Except format options, some formats like parquet and orc can not list all supported // options. helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + "."); @@ -160,7 +160,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami .get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE)); } - private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat( + protected <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat( Context context, Class<F> formatFactoryClass) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); if (formatFactoryExists(context, formatFactoryClass)) { diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java index 1e53eb53cfb..bbaa2a310d5 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java @@ -264,7 +264,7 @@ public class FileSystemTableSource extends AbstractFileSystemTable return bulkFormat; } - private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) { + protected SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) { final FileSource.FileSourceBuilder<RowData> fileSourceBuilder = FileSource.forBulkFileFormat(bulkFormat, paths()); @@ -287,7 +287,7 @@ public class FileSystemTableSource extends AbstractFileSystemTable return SourceProvider.of(fileSourceBuilder.build()); } - private Path[] paths() { + protected Path[] paths() { if (partitionKeys.isEmpty()) { return new Path[] {path}; } else {