This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc6f288d1bc48c633dbdd0db3a795465b83ce3f3 Author: fengli <ldliu...@163.com> AuthorDate: Tue Jun 25 15:43:55 2024 +0800 [FLINK-35691][testutils] Optimize test-filesystem logic to support streaming and batch read --- .../file/table/FileSystemTableFactory.java | 4 +- .../flink/connector/file/src/TestFileSource.java | 190 --------------------- .../file/table/TestFileSystemTableSource.java | 88 ---------- .../file/testutils/TestFileSystemTableFactory.java | 13 +- .../testutils/TestFileSystemTableFactoryTest.java | 14 +- 5 files changed, 18 insertions(+), 291 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java index 31f6dddbc69..00db848356b 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java @@ -149,7 +149,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami .collect(Collectors.toSet()); } - private void validate(FactoryUtil.TableFactoryHelper helper) { + protected void validate(FactoryUtil.TableFactoryHelper helper) { // Except format options, some formats like parquet and orc can not list all supported // options. helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) + "."); @@ -160,7 +160,7 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami .get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE)); } - private <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat( + protected <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat( Context context, Class<F> formatFactoryClass) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); if (formatFactoryExists(context, formatFactoryClass)) { diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java deleted file mode 100644 index e939ad51c36..00000000000 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.file.src; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.api.connector.source.DynamicParallelismInference; -import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; -import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner; -import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator; -import org.apache.flink.connector.file.src.enumerate.FileEnumerator; -import org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator; -import org.apache.flink.connector.file.src.reader.BulkFormat; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.util.Collection; - -/** - * A unified data source that reads files - both in batch and in streaming mode. This is used only - * for test. Due to {@link FileSource} is a final class, so we can't extend it directly. - * - * @param <T> The type of the events/records produced by this source. - */ -@Internal -public class TestFileSource<T> extends AbstractFileSource<T, FileSourceSplit> - implements DynamicParallelismInference { - - private static final long serialVersionUID = 1L; - - /** The default split assigner, a lazy locality-aware assigner. */ - public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER = - LocalityAwareSplitAssigner::new; - - /** - * The default file enumerator used for splittable formats. The enumerator recursively - * enumerates files, split files that consist of multiple distributed storage blocks into - * multiple splits, and filters hidden files (files starting with '.' or '_'). Files with - * suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...) will - * not be split. - */ - public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR = - BlockSplittingRecursiveEnumerator::new; - - /** - * The default file enumerator used for non-splittable formats. The enumerator recursively - * enumerates files, creates one split for the file, and filters hidden files (files starting - * with '.' or '_'). - */ - public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR = - NonSplittingRecursiveEnumerator::new; - - private final boolean isStreamingMode; - private final ContinuousEnumerationSettings continuousEnumerationSettings; - - // ------------------------------------------------------------------------ - - private TestFileSource( - final Path[] inputPaths, - final FileEnumerator.Provider fileEnumerator, - final FileSplitAssigner.Provider splitAssigner, - final BulkFormat<T, FileSourceSplit> readerFormat, - final boolean isStreamingMode, - @Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) { - - super( - inputPaths, - fileEnumerator, - splitAssigner, - readerFormat, - continuousEnumerationSettings); - this.isStreamingMode = isStreamingMode; - this.continuousEnumerationSettings = continuousEnumerationSettings; - } - - @Override - public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() { - return FileSourceSplitSerializer.INSTANCE; - } - - @Override - public Boundedness getBoundedness() { - return isStreamingMode && continuousEnumerationSettings != null - ? Boundedness.CONTINUOUS_UNBOUNDED - : Boundedness.BOUNDED; - } - - @Override - public int inferParallelism(Context dynamicParallelismContext) { - FileEnumerator fileEnumerator = getEnumeratorFactory().create(); - - Collection<FileSourceSplit> splits; - try { - splits = - fileEnumerator.enumerateSplits( - inputPaths, - dynamicParallelismContext.getParallelismInferenceUpperBound()); - } catch (IOException e) { - throw new FlinkRuntimeException("Could not enumerate file splits", e); - } - - return Math.min( - splits.size(), dynamicParallelismContext.getParallelismInferenceUpperBound()); - } - - // ------------------------------------------------------------------------ - // Entry-point Factory Methods - // ------------------------------------------------------------------------ - /** - * Builds a new {@code FileSource} using a {@link BulkFormat} to read batches of records from - * files. - * - * <p>Examples for bulk readers are compressed and vectorized formats such as ORC or Parquet. - */ - public static <T> TestFileSource.TestFileSourceBuilder<T> forBulkFileFormat( - final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths) { - Preconditions.checkNotNull(bulkFormat, "reader"); - Preconditions.checkNotNull(paths, "paths"); - Preconditions.checkArgument(paths.length > 0, "paths must not be empty"); - - return new TestFileSource.TestFileSourceBuilder<>(paths, bulkFormat); - } - - // ------------------------------------------------------------------------ - // Builder - // ------------------------------------------------------------------------ - - /** - * The builder for the {@code FileSource}, to configure the various behaviors. - * - * <p>Start building the source via one of the following methods: - * - * <ul> - * <li>{@link TestFileSource#forBulkFileFormat(BulkFormat, Path...)} - * </ul> - */ - public static final class TestFileSourceBuilder<T> - extends AbstractFileSourceBuilder< - T, FileSourceSplit, TestFileSource.TestFileSourceBuilder<T>> { - - private boolean isStreamingMode = false; - - TestFileSourceBuilder(Path[] inputPaths, BulkFormat<T, FileSourceSplit> readerFormat) { - super( - inputPaths, - readerFormat, - readerFormat.isSplittable() - ? DEFAULT_SPLITTABLE_FILE_ENUMERATOR - : DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR, - DEFAULT_SPLIT_ASSIGNER); - } - - public TestFileSourceBuilder<T> setStreamingMode(boolean streamingMode) { - this.isStreamingMode = streamingMode; - return this; - } - - @Override - public TestFileSource<T> build() { - return new TestFileSource<>( - inputPaths, - fileEnumerator, - splitAssigner, - readerFormat, - isStreamingMode, - continuousSourceSettings); - } - } -} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java deleted file mode 100644 index 4a71fcdccab..00000000000 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.file.table; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.connector.file.src.FileSourceSplit; -import org.apache.flink.connector.file.src.TestFileSource; -import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveAllDirEnumerator; -import org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveAllDirEnumerator; -import org.apache.flink.connector.file.src.reader.BulkFormat; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.connector.format.DecodingFormat; -import org.apache.flink.table.connector.source.SourceProvider; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.DataType; - -import javax.annotation.Nullable; - -import java.util.List; - -/** Test file system table source. */ -@Internal -public class TestFileSystemTableSource extends FileSystemTableSource { - - private final boolean isStreamingMode; - - public TestFileSystemTableSource( - ObjectIdentifier tableIdentifier, - DataType physicalRowDataType, - List<String> partitionKeys, - ReadableConfig tableOptions, - boolean isStreamingMode, - @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat, - @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat) { - super( - tableIdentifier, - physicalRowDataType, - partitionKeys, - tableOptions, - bulkReaderFormat, - deserializationFormat); - this.isStreamingMode = isStreamingMode; - } - - @Override - protected SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) { - final TestFileSource.TestFileSourceBuilder<RowData> fileSourceBuilder = - TestFileSource.forBulkFileFormat(bulkFormat, paths()); - - tableOptions - .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL) - .ifPresent(fileSourceBuilder::monitorContinuously); - tableOptions - .getOptional(FileSystemConnectorOptions.SOURCE_PATH_REGEX_PATTERN) - .ifPresent( - regex -> - fileSourceBuilder.setFileEnumerator( - bulkFormat.isSplittable() - ? () -> - new BlockSplittingRecursiveAllDirEnumerator( - regex) - : () -> - new NonSplittingRecursiveAllDirEnumerator( - regex))); - - fileSourceBuilder.setStreamingMode(isStreamingMode); - - return SourceProvider.of(fileSourceBuilder.build()); - } -} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java index 98d9caff0aa..3994222db5b 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java @@ -20,10 +20,11 @@ package org.apache.flink.table.file.testutils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.connector.file.table.FileSystemConnectorOptions; import org.apache.flink.connector.file.table.FileSystemTableFactory; -import org.apache.flink.connector.file.table.TestFileSystemTableSource; +import org.apache.flink.connector.file.table.FileSystemTableSource; import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -67,12 +68,16 @@ public class TestFileSystemTableFactory extends FileSystemTableFactory { boolean isStreamingMode = context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; - return new TestFileSystemTableSource( + Configuration tableOptions = Configuration.fromMap(helper.getOptions().toMap()); + if (!isStreamingMode) { + tableOptions.removeConfig(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL); + } + + return new FileSystemTableSource( context.getObjectIdentifier(), context.getPhysicalRowDataType(), context.getCatalogTable().getPartitionKeys(), - helper.getOptions(), - isStreamingMode, + tableOptions, discoverDecodingFormat(context, BulkReaderFormatFactory.class), discoverDecodingFormat(context, DeserializationFormatFactory.class)); } diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java index 009ede7577f..90ca28531f0 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java @@ -20,7 +20,7 @@ package org.apache.flink.table.file.testutils; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.table.FileSystemTableSink; -import org.apache.flink.connector.file.table.TestFileSystemTableSource; +import org.apache.flink.connector.file.table.FileSystemTableSource; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; @@ -64,7 +64,7 @@ public class TestFileSystemTableFactoryTest { options.put("partition.fields.f1.date-formatter", "yyyy-MM-dd"); DynamicTableSource source = createTableSource(SCHEMA, options); - assertThat(source).isInstanceOf(TestFileSystemTableSource.class); + assertThat(source).isInstanceOf(FileSystemTableSource.class); DynamicTableSink sink = createTableSink(SCHEMA, options); assertThat(sink).isInstanceOf(FileSystemTableSink.class); @@ -79,11 +79,11 @@ public class TestFileSystemTableFactoryTest { options.put("source.monitor-interval", "5S"); DynamicTableSource source = createTableSource(SCHEMA, options); - assertThat(source).isInstanceOf(TestFileSystemTableSource.class); + assertThat(source).isInstanceOf(FileSystemTableSource.class); // assert source is unbounded when specify source.monitor-interval ScanTableSource.ScanRuntimeProvider scanRuntimeProvider = - ((TestFileSystemTableSource) source) + ((FileSystemTableSource) source) .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertThat(scanRuntimeProvider.isBounded()).isFalse(); } @@ -100,11 +100,11 @@ public class TestFileSystemTableFactoryTest { configuration.set(RUNTIME_MODE, BATCH); DynamicTableSource source = createTableSource(SCHEMA, options, configuration); - assertThat(source).isInstanceOf(TestFileSystemTableSource.class); + assertThat(source).isInstanceOf(FileSystemTableSource.class); - // assert source is unbounded when specify source.monitor-interval + // assert source is bounded when specify source.monitor-interval and in batch mode ScanTableSource.ScanRuntimeProvider scanRuntimeProvider = - ((TestFileSystemTableSource) source) + ((FileSystemTableSource) source) .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); assertThat(scanRuntimeProvider.isBounded()).isTrue(); }