This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc6f288d1bc48c633dbdd0db3a795465b83ce3f3
Author: fengli <ldliu...@163.com>
AuthorDate: Tue Jun 25 15:43:55 2024 +0800

    [FLINK-35691][testutils] Optimize test-filesystem logic to support 
streaming and batch read
---
 .../file/table/FileSystemTableFactory.java         |   4 +-
 .../flink/connector/file/src/TestFileSource.java   | 190 ---------------------
 .../file/table/TestFileSystemTableSource.java      |  88 ----------
 .../file/testutils/TestFileSystemTableFactory.java |  13 +-
 .../testutils/TestFileSystemTableFactoryTest.java  |  14 +-
 5 files changed, 18 insertions(+), 291 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index 31f6dddbc69..00db848356b 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -149,7 +149,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
                 .collect(Collectors.toSet());
     }
 
-    private void validate(FactoryUtil.TableFactoryHelper helper) {
+    protected void validate(FactoryUtil.TableFactoryHelper helper) {
         // Except format options, some formats like parquet and orc can not 
list all supported
         // options.
         helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + 
".");
@@ -160,7 +160,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
                         
.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
     }
 
-    private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
+    protected <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
             Context context, Class<F> formatFactoryClass) {
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         if (formatFactoryExists(context, formatFactoryClass)) {
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
deleted file mode 100644
index e939ad51c36..00000000000
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.file.src;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.DynamicParallelismInference;
-import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
-import 
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;
-import 
org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
-import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
-import 
org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * A unified data source that reads files - both in batch and in streaming 
mode. This is used only
- * for test. Due to {@link FileSource} is a final class, so we can't extend it 
directly.
- *
- * @param <T> The type of the events/records produced by this source.
- */
-@Internal
-public class TestFileSource<T> extends AbstractFileSource<T, FileSourceSplit>
-        implements DynamicParallelismInference {
-
-    private static final long serialVersionUID = 1L;
-
-    /** The default split assigner, a lazy locality-aware assigner. */
-    public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER =
-            LocalityAwareSplitAssigner::new;
-
-    /**
-     * The default file enumerator used for splittable formats. The enumerator 
recursively
-     * enumerates files, split files that consist of multiple distributed 
storage blocks into
-     * multiple splits, and filters hidden files (files starting with '.' or 
'_'). Files with
-     * suffixes of common compression formats (for example '.gzip', '.bz2', 
'.xy', '.zip', ...) will
-     * not be split.
-     */
-    public static final FileEnumerator.Provider 
DEFAULT_SPLITTABLE_FILE_ENUMERATOR =
-            BlockSplittingRecursiveEnumerator::new;
-
-    /**
-     * The default file enumerator used for non-splittable formats. The 
enumerator recursively
-     * enumerates files, creates one split for the file, and filters hidden 
files (files starting
-     * with '.' or '_').
-     */
-    public static final FileEnumerator.Provider 
DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR =
-            NonSplittingRecursiveEnumerator::new;
-
-    private final boolean isStreamingMode;
-    private final ContinuousEnumerationSettings continuousEnumerationSettings;
-
-    // ------------------------------------------------------------------------
-
-    private TestFileSource(
-            final Path[] inputPaths,
-            final FileEnumerator.Provider fileEnumerator,
-            final FileSplitAssigner.Provider splitAssigner,
-            final BulkFormat<T, FileSourceSplit> readerFormat,
-            final boolean isStreamingMode,
-            @Nullable final ContinuousEnumerationSettings 
continuousEnumerationSettings) {
-
-        super(
-                inputPaths,
-                fileEnumerator,
-                splitAssigner,
-                readerFormat,
-                continuousEnumerationSettings);
-        this.isStreamingMode = isStreamingMode;
-        this.continuousEnumerationSettings = continuousEnumerationSettings;
-    }
-
-    @Override
-    public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
-        return FileSourceSplitSerializer.INSTANCE;
-    }
-
-    @Override
-    public Boundedness getBoundedness() {
-        return isStreamingMode && continuousEnumerationSettings != null
-                ? Boundedness.CONTINUOUS_UNBOUNDED
-                : Boundedness.BOUNDED;
-    }
-
-    @Override
-    public int inferParallelism(Context dynamicParallelismContext) {
-        FileEnumerator fileEnumerator = getEnumeratorFactory().create();
-
-        Collection<FileSourceSplit> splits;
-        try {
-            splits =
-                    fileEnumerator.enumerateSplits(
-                            inputPaths,
-                            
dynamicParallelismContext.getParallelismInferenceUpperBound());
-        } catch (IOException e) {
-            throw new FlinkRuntimeException("Could not enumerate file splits", 
e);
-        }
-
-        return Math.min(
-                splits.size(), 
dynamicParallelismContext.getParallelismInferenceUpperBound());
-    }
-
-    // ------------------------------------------------------------------------
-    //  Entry-point Factory Methods
-    // ------------------------------------------------------------------------
-    /**
-     * Builds a new {@code FileSource} using a {@link BulkFormat} to read 
batches of records from
-     * files.
-     *
-     * <p>Examples for bulk readers are compressed and vectorized formats such 
as ORC or Parquet.
-     */
-    public static <T> TestFileSource.TestFileSourceBuilder<T> 
forBulkFileFormat(
-            final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... 
paths) {
-        Preconditions.checkNotNull(bulkFormat, "reader");
-        Preconditions.checkNotNull(paths, "paths");
-        Preconditions.checkArgument(paths.length > 0, "paths must not be 
empty");
-
-        return new TestFileSource.TestFileSourceBuilder<>(paths, bulkFormat);
-    }
-
-    // ------------------------------------------------------------------------
-    //  Builder
-    // ------------------------------------------------------------------------
-
-    /**
-     * The builder for the {@code FileSource}, to configure the various 
behaviors.
-     *
-     * <p>Start building the source via one of the following methods:
-     *
-     * <ul>
-     *   <li>{@link TestFileSource#forBulkFileFormat(BulkFormat, Path...)}
-     * </ul>
-     */
-    public static final class TestFileSourceBuilder<T>
-            extends AbstractFileSourceBuilder<
-                    T, FileSourceSplit, 
TestFileSource.TestFileSourceBuilder<T>> {
-
-        private boolean isStreamingMode = false;
-
-        TestFileSourceBuilder(Path[] inputPaths, BulkFormat<T, 
FileSourceSplit> readerFormat) {
-            super(
-                    inputPaths,
-                    readerFormat,
-                    readerFormat.isSplittable()
-                            ? DEFAULT_SPLITTABLE_FILE_ENUMERATOR
-                            : DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR,
-                    DEFAULT_SPLIT_ASSIGNER);
-        }
-
-        public TestFileSourceBuilder<T> setStreamingMode(boolean 
streamingMode) {
-            this.isStreamingMode = streamingMode;
-            return this;
-        }
-
-        @Override
-        public TestFileSource<T> build() {
-            return new TestFileSource<>(
-                    inputPaths,
-                    fileEnumerator,
-                    splitAssigner,
-                    readerFormat,
-                    isStreamingMode,
-                    continuousSourceSettings);
-        }
-    }
-}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
deleted file mode 100644
index 4a71fcdccab..00000000000
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.file.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.TestFileSource;
-import 
org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveAllDirEnumerator;
-import 
org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveAllDirEnumerator;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.source.SourceProvider;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-
-/** Test file system table source. */
-@Internal
-public class TestFileSystemTableSource extends FileSystemTableSource {
-
-    private final boolean isStreamingMode;
-
-    public TestFileSystemTableSource(
-            ObjectIdentifier tableIdentifier,
-            DataType physicalRowDataType,
-            List<String> partitionKeys,
-            ReadableConfig tableOptions,
-            boolean isStreamingMode,
-            @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> 
bulkReaderFormat,
-            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat) {
-        super(
-                tableIdentifier,
-                physicalRowDataType,
-                partitionKeys,
-                tableOptions,
-                bulkReaderFormat,
-                deserializationFormat);
-        this.isStreamingMode = isStreamingMode;
-    }
-
-    @Override
-    protected SourceProvider createSourceProvider(BulkFormat<RowData, 
FileSourceSplit> bulkFormat) {
-        final TestFileSource.TestFileSourceBuilder<RowData> fileSourceBuilder =
-                TestFileSource.forBulkFileFormat(bulkFormat, paths());
-
-        tableOptions
-                
.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
-                .ifPresent(fileSourceBuilder::monitorContinuously);
-        tableOptions
-                
.getOptional(FileSystemConnectorOptions.SOURCE_PATH_REGEX_PATTERN)
-                .ifPresent(
-                        regex ->
-                                fileSourceBuilder.setFileEnumerator(
-                                        bulkFormat.isSplittable()
-                                                ? () ->
-                                                        new 
BlockSplittingRecursiveAllDirEnumerator(
-                                                                regex)
-                                                : () ->
-                                                        new 
NonSplittingRecursiveAllDirEnumerator(
-                                                                regex)));
-
-        fileSourceBuilder.setStreamingMode(isStreamingMode);
-
-        return SourceProvider.of(fileSourceBuilder.build());
-    }
-}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
index 98d9caff0aa..3994222db5b 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
@@ -20,10 +20,11 @@ package org.apache.flink.table.file.testutils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
 import org.apache.flink.connector.file.table.FileSystemTableFactory;
-import org.apache.flink.connector.file.table.TestFileSystemTableSource;
+import org.apache.flink.connector.file.table.FileSystemTableSource;
 import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -67,12 +68,16 @@ public class TestFileSystemTableFactory extends 
FileSystemTableFactory {
         boolean isStreamingMode =
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
-        return new TestFileSystemTableSource(
+        Configuration tableOptions = 
Configuration.fromMap(helper.getOptions().toMap());
+        if (!isStreamingMode) {
+            
tableOptions.removeConfig(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL);
+        }
+
+        return new FileSystemTableSource(
                 context.getObjectIdentifier(),
                 context.getPhysicalRowDataType(),
                 context.getCatalogTable().getPartitionKeys(),
-                helper.getOptions(),
-                isStreamingMode,
+                tableOptions,
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, 
DeserializationFormatFactory.class));
     }
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
index 009ede7577f..90ca28531f0 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.file.testutils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.table.FileSystemTableSink;
-import org.apache.flink.connector.file.table.TestFileSystemTableSource;
+import org.apache.flink.connector.file.table.FileSystemTableSource;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -64,7 +64,7 @@ public class TestFileSystemTableFactoryTest {
         options.put("partition.fields.f1.date-formatter", "yyyy-MM-dd");
 
         DynamicTableSource source = createTableSource(SCHEMA, options);
-        assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+        assertThat(source).isInstanceOf(FileSystemTableSource.class);
 
         DynamicTableSink sink = createTableSink(SCHEMA, options);
         assertThat(sink).isInstanceOf(FileSystemTableSink.class);
@@ -79,11 +79,11 @@ public class TestFileSystemTableFactoryTest {
         options.put("source.monitor-interval", "5S");
 
         DynamicTableSource source = createTableSource(SCHEMA, options);
-        assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+        assertThat(source).isInstanceOf(FileSystemTableSource.class);
 
         // assert source is unbounded when specify source.monitor-interval
         ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
-                ((TestFileSystemTableSource) source)
+                ((FileSystemTableSource) source)
                         
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         assertThat(scanRuntimeProvider.isBounded()).isFalse();
     }
@@ -100,11 +100,11 @@ public class TestFileSystemTableFactoryTest {
         configuration.set(RUNTIME_MODE, BATCH);
 
         DynamicTableSource source = createTableSource(SCHEMA, options, 
configuration);
-        assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+        assertThat(source).isInstanceOf(FileSystemTableSource.class);
 
-        // assert source is unbounded when specify source.monitor-interval
+        // assert source is bounded when specify source.monitor-interval and 
in batch mode
         ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
-                ((TestFileSystemTableSource) source)
+                ((FileSystemTableSource) source)
                         
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         assertThat(scanRuntimeProvider.isBounded()).isTrue();
     }

Reply via email to