This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b9aaca3c96a79c241568f4493f6b308be1eb4ce Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Wed Nov 10 12:19:38 2021 +0100 [FLINK-24859][doc][formats] Make new formats name coherent --- .../docs/connectors/datastream/hybridsource.md | 2 +- .../docs/connectors/datastream/hybridsource.md | 2 +- .../connector/base/source/hybrid/HybridSource.java | 2 +- ...extLineFormat.java => TextLineInputFormat.java} | 8 ++--- .../file/src/FileSourceTextLinesITCase.java | 8 +++-- .../file/src/impl/FileSourceReaderTest.java | 4 +-- .../flink/connectors/hive/HiveSourceBuilder.java | 4 +-- .../hive/read/HiveCompactReaderFactory.java | 4 +-- ...BulkFormatAdapter.java => HiveInputFormat.java} | 12 ++++---- .../nohive/OrcNoHiveColumnarRowInputFormat.java | 10 +++---- ...tFormat.java => OrcColumnarRowInputFormat.java} | 10 +++---- .../org/apache/flink/orc/OrcFileFormatFactory.java | 2 +- ...est.java => OrcColumnarRowInputFormatTest.java} | 34 +++++++++++----------- flink-python/pyflink/datastream/connectors.py | 2 +- .../table/filesystem/LimitableBulkFormatTest.java | 8 +++-- 15 files changed, 58 insertions(+), 54 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md b/docs/content.zh/docs/connectors/datastream/hybridsource.md index 02f5077..7058ac4 100644 --- a/docs/content.zh/docs/connectors/datastream/hybridsource.md +++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md @@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca ```java long switchTimestamp = ...; // derive from file input paths FileSource<String> fileSource = - FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); + FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) diff --git a/docs/content/docs/connectors/datastream/hybridsource.md b/docs/content/docs/connectors/datastream/hybridsource.md index 02f5077..7058ac4 100644 --- a/docs/content/docs/connectors/datastream/hybridsource.md +++ b/docs/content/docs/connectors/datastream/hybridsource.md @@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the contained sources ca ```java long switchTimestamp = ...; // derive from file input paths FileSource<String> fileSource = - FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); + FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java index 24acb6a..8df875b 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java @@ -41,7 +41,7 @@ import java.util.List; * * <pre>{@code * FileSource<String> fileSource = - * FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)).build(); + * FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build(); * KafkaSource<String> kafkaSource = * KafkaSource.<String>builder() * .setBootstrapServers("localhost:9092") diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java similarity index 93% rename from flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java rename to flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java index c6fa3b1..00d906d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java @@ -42,7 +42,7 @@ import java.io.InputStreamReader; * with their internal buffering of stream input and charset decoder state. */ @PublicEvolving -public class TextLineFormat extends SimpleStreamFormat<String> { +public class TextLineInputFormat extends SimpleStreamFormat<String> { private static final long serialVersionUID = 1L; @@ -50,11 +50,11 @@ public class TextLineFormat extends SimpleStreamFormat<String> { private final String charsetName; - public TextLineFormat() { + public TextLineInputFormat() { this(DEFAULT_CHARSET_NAME); } - public TextLineFormat(String charsetName) { + public TextLineInputFormat(String charsetName) { this.charsetName = charsetName; } @@ -72,7 +72,7 @@ public class TextLineFormat extends SimpleStreamFormat<String> { // ------------------------------------------------------------------------ - /** The actual reader for the {@code TextLineFormat}. */ + /** The actual reader for the {@code TextLineInputFormat}. */ public static final class Reader implements StreamFormat.Reader<String> { private final BufferedReader reader; diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java index eea0fa7..5f2a851 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java @@ -21,7 +21,7 @@ package org.apache.flink.connector.file.src; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.connector.file.src.reader.TextLineFormat; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; import org.apache.flink.runtime.minicluster.MiniCluster; @@ -119,7 +119,8 @@ public class FileSourceTextLinesITCase extends TestLogger { writeHiddenJunkFiles(testDir); final FileSource<String> source = - FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)) + FileSource.forRecordStreamFormat( + new TextLineInputFormat(), Path.fromLocalFile(testDir)) .build(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -184,7 +185,8 @@ public class FileSourceTextLinesITCase extends TestLogger { final File testDir = TMP_FOLDER.newFolder(); final FileSource<String> source = - FileSource.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir)) + FileSource.forRecordStreamFormat( + new TextLineInputFormat(), Path.fromLocalFile(testDir)) .monitorContinuously(Duration.ofMillis(5)) .build(); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java index a25a14c..d70745f 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java @@ -20,7 +20,7 @@ package org.apache.flink.connector.file.src.impl; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.file.src.FileSourceSplit; -import org.apache.flink.connector.file.src.reader.TextLineFormat; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.apache.flink.core.fs.Path; @@ -64,7 +64,7 @@ public class FileSourceReaderTest { private static FileSourceReader<String, FileSourceSplit> createReader( TestingReaderContext context) { return new FileSourceReader<>( - context, new StreamFormatAdapter<>(new TextLineFormat()), new Configuration()); + context, new StreamFormatAdapter<>(new TextLineInputFormat()), new Configuration()); } private static FileSourceSplit createTestFileSplit() throws IOException { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java index 7b8442a..4b42073 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java @@ -25,8 +25,8 @@ import org.apache.flink.connector.file.src.ContinuousEnumerationSettings; import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner; import org.apache.flink.connector.file.src.reader.BulkFormat; -import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter; import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher; +import org.apache.flink.connectors.hive.read.HiveInputFormat; import org.apache.flink.connectors.hive.read.HiveSourceSplit; import org.apache.flink.connectors.hive.util.HiveConfUtils; import org.apache.flink.connectors.hive.util.HivePartitionUtils; @@ -310,7 +310,7 @@ public class HiveSourceBuilder { private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() { return LimitableBulkFormat.create( - new HiveBulkFormatAdapter( + new HiveInputFormat( new JobConfWrapper(jobConf), partitionKeys, fullSchema.getFieldNames(), diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java index 012ef01..47b36d9 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java @@ -88,8 +88,8 @@ public class HiveCompactReaderFactory implements CompactReader.Factory<RowData> @Override public CompactReader<RowData> create(CompactContext context) throws IOException { HiveSourceSplit split = createSplit(context.getPath(), context.getFileSystem()); - HiveBulkFormatAdapter format = - new HiveBulkFormatAdapter( + HiveInputFormat format = + new HiveInputFormat( jobConfWrapper, partitionKeys, fieldNames, diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java similarity index 97% rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java index 99aad05..6ab0dc9 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java @@ -27,7 +27,7 @@ import org.apache.flink.connectors.hive.JobConfWrapper; import org.apache.flink.connectors.hive.util.HivePartitionUtils; import org.apache.flink.connectors.hive.util.JobConfUtils; import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat; -import org.apache.flink.orc.OrcColumnarRowFileInputFormat; +import org.apache.flink.orc.OrcColumnarRowInputFormat; import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat; import org.apache.flink.orc.shim.OrcShim; import org.apache.flink.table.catalog.hive.client.HiveShim; @@ -62,11 +62,11 @@ import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_S * BulkFormat instances, because different hive partitions may need different BulkFormat to do the * reading. */ -public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSplit> { +public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(HiveBulkFormatAdapter.class); + private static final Logger LOG = LoggerFactory.getLogger(HiveInputFormat.class); // schema evolution configs are not available in older versions of IOConstants, let's define // them ourselves @@ -83,7 +83,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli private final boolean useMapRedReader; private final PartitionFieldExtractor<HiveSourceSplit> partitionFieldExtractor; - public HiveBulkFormatAdapter( + public HiveInputFormat( JobConfWrapper jobConfWrapper, List<String> partitionKeys, String[] fieldNames, @@ -162,7 +162,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli } } - private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> createOrcFormat() { + private OrcColumnarRowInputFormat<?, HiveSourceSplit> createOrcFormat() { return hiveVersion.startsWith("1.") ? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat( jobConfWrapper.conf(), @@ -172,7 +172,7 @@ public class HiveBulkFormatAdapter implements BulkFormat<RowData, HiveSourceSpli computeSelectedFields(), Collections.emptyList(), DEFAULT_SIZE) - : OrcColumnarRowFileInputFormat.createPartitionedFormat( + : OrcColumnarRowInputFormat.createPartitionedFormat( OrcShim.createShim(hiveVersion), jobConfWrapper.conf(), tableRowType(), diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java index 47af6ad..b186faf 100644 --- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java +++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java @@ -19,7 +19,7 @@ package org.apache.flink.orc.nohive; import org.apache.flink.connector.file.src.FileSourceSplit; -import org.apache.flink.orc.OrcColumnarRowFileInputFormat; +import org.apache.flink.orc.OrcColumnarRowInputFormat; import org.apache.flink.orc.OrcFilters; import org.apache.flink.orc.nohive.shim.OrcNoHiveShim; import org.apache.flink.orc.vector.ColumnBatchFactory; @@ -42,16 +42,16 @@ import static org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields; import static org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector; import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant; -/** Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive. */ +/** Helper class to create {@link OrcColumnarRowInputFormat} for no-hive. */ public class OrcNoHiveColumnarRowInputFormat { private OrcNoHiveColumnarRowInputFormat() {} /** - * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be + * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be * generated by split. */ public static <SplitT extends FileSourceSplit> - OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat( + OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat( Configuration hadoopConfig, RowType tableType, List<String> partitionKeys, @@ -84,7 +84,7 @@ public class OrcNoHiveColumnarRowInputFormat { return new VectorizedColumnBatch(vectors); }; - return new OrcColumnarRowFileInputFormat<>( + return new OrcColumnarRowInputFormat<>( new OrcNoHiveShim(), hadoopConfig, convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys), diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java similarity index 95% rename from flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java rename to flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java index d88c967..bb12d51 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java @@ -55,7 +55,7 @@ import static org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVec * fields, which can be extracted from path. Therefore, the {@link #getProducedType()} may be * different and types of extra fields need to be added. */ -public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSplit> +public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> { private static final long serialVersionUID = 1L; @@ -63,7 +63,7 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSpli private final ColumnBatchFactory<BatchT, SplitT> batchFactory; private final RowType projectedOutputType; - public OrcColumnarRowFileInputFormat( + public OrcColumnarRowInputFormat( final OrcShim<BatchT> shim, final Configuration hadoopConfig, final TypeDescription schema, @@ -126,11 +126,11 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSpli } /** - * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the partition columns can be + * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition columns can be * generated by split. */ public static <SplitT extends FileSourceSplit> - OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat( + OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat( OrcShim<VectorizedRowBatch> shim, Configuration hadoopConfig, RowType tableType, @@ -164,7 +164,7 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends FileSourceSpli return new VectorizedColumnBatch(vectors); }; - return new OrcColumnarRowFileInputFormat<>( + return new OrcColumnarRowInputFormat<>( shim, hadoopConfig, convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, partitionKeys), diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java index 7343959..853b3b8 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java @@ -129,7 +129,7 @@ public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriter FileSystemConnectorOptions.PARTITION_DEFAULT_NAME .defaultValue()); - return OrcColumnarRowFileInputFormat.createPartitionedFormat( + return OrcColumnarRowInputFormat.createPartitionedFormat( OrcShim.defaultShim(), conf, tableType, diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java similarity index 93% rename from flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java rename to flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java index 2243d5c..5094866 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java @@ -62,8 +62,8 @@ import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -/** Test for {@link OrcColumnarRowFileInputFormat}. */ -public class OrcColumnarRowFileInputFormatTest { +/** Test for {@link OrcColumnarRowInputFormat}. */ +public class OrcColumnarRowInputFormatTest { /** Small batch size for test more boundary conditions. */ protected static final int BATCH_SIZE = 9; @@ -97,7 +97,7 @@ public class OrcColumnarRowFileInputFormatTest { @Test public void testReadFileInSplits() throws IOException { - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format = + OrcColumnarRowInputFormat<?, FileSourceSplit> format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1}); AtomicInteger cnt = new AtomicInteger(0); @@ -124,7 +124,7 @@ public class OrcColumnarRowFileInputFormatTest { @Test public void testReadFileWithSelectFields() throws IOException { - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format = + OrcColumnarRowInputFormat<?, FileSourceSplit> format = createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1}); AtomicInteger cnt = new AtomicInteger(0); @@ -153,7 +153,7 @@ public class OrcColumnarRowFileInputFormatTest { @Test public void testReadDecimalTypeFile() throws IOException { - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format = + OrcColumnarRowInputFormat<?, FileSourceSplit> format = createFormat(DECIMAL_FILE_TYPE, new int[] {0}); AtomicInteger cnt = new AtomicInteger(0); @@ -217,7 +217,7 @@ public class OrcColumnarRowFileInputFormatTest { int[] projectedFields = {8, 1, 3, 0, 5, 2}; - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format = + OrcColumnarRowInputFormat<?, FileSourceSplit> format = createPartitionFormat( tableType, new ArrayList<>(partSpec.keySet()), projectedFields); @@ -257,7 +257,7 @@ public class OrcColumnarRowFileInputFormatTest { @Test public void testReadFileAndRestore() throws IOException { - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format = + OrcColumnarRowInputFormat<?, FileSourceSplit> format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1}); // pick a middle split @@ -276,7 +276,7 @@ public class OrcColumnarRowFileInputFormatTest { new Between("_col0", PredicateLeaf.Type.LONG, 0L, 975000L), new Equals("_col0", PredicateLeaf.Type.LONG, 980001L), new Between("_col0", PredicateLeaf.Type.LONG, 990000L, 1800000L))); - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format = + OrcColumnarRowInputFormat<?, FileSourceSplit> format = createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter); // pick a middle split @@ -290,7 +290,7 @@ public class OrcColumnarRowFileInputFormatTest { } private void innerTestRestore( - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, + OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split, int breakCnt, int expectedCnt, @@ -338,14 +338,14 @@ public class OrcColumnarRowFileInputFormatTest { assertEquals(expectedTotalF0, totalF0.get()); } - protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat( + protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat( RowType formatType, int[] selectedFields) { return createFormat(formatType, selectedFields, new ArrayList<>()); } - protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat( + protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat( RowType formatType, int[] selectedFields, List<Predicate> conjunctPredicates) { - return OrcColumnarRowFileInputFormat.createPartitionedFormat( + return OrcColumnarRowInputFormat.createPartitionedFormat( OrcShim.defaultShim(), new Configuration(), formatType, @@ -356,9 +356,9 @@ public class OrcColumnarRowFileInputFormatTest { BATCH_SIZE); } - protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createPartitionFormat( + protected OrcColumnarRowInputFormat<?, FileSourceSplit> createPartitionFormat( RowType tableType, List<String> partitionKeys, int[] selectedFields) { - return OrcColumnarRowFileInputFormat.createPartitionedFormat( + return OrcColumnarRowInputFormat.createPartitionedFormat( OrcShim.defaultShim(), new Configuration(), tableType, @@ -370,13 +370,13 @@ public class OrcColumnarRowFileInputFormatTest { } private BulkFormat.Reader<RowData> createReader( - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, FileSourceSplit split) + OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split) throws IOException { return format.createReader(new org.apache.flink.configuration.Configuration(), split); } private BulkFormat.Reader<RowData> restoreReader( - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, + OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split, long offset, long recordSkipCount) @@ -389,7 +389,7 @@ public class OrcColumnarRowFileInputFormatTest { } private void forEach( - OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, + OrcColumnarRowInputFormat<?, FileSourceSplit> format, FileSourceSplit split, Consumer<RowData> action) throws IOException { diff --git a/flink-python/pyflink/datastream/connectors.py b/flink-python/pyflink/datastream/connectors.py index b4b3335..035cd62 100644 --- a/flink-python/pyflink/datastream/connectors.py +++ b/flink-python/pyflink/datastream/connectors.py @@ -757,7 +757,7 @@ class StreamFormat(object): :param charset_name: The charset to decode the byte stream. """ j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \ - TextLineFormat(charset_name) + TextLineInputFormat(charset_name) return StreamFormat(j_stream_format) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java index 92a1d22..471af36 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java @@ -24,7 +24,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.impl.StreamFormatAdapter; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.src.reader.StreamFormat; -import org.apache.flink.connector.file.src.reader.TextLineFormat; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.connector.file.src.util.Utils; import org.apache.flink.core.fs.Path; import org.apache.flink.util.FileUtils; @@ -56,7 +56,8 @@ public class LimitableBulkFormatTest { // read BulkFormat<String, FileSourceSplit> format = - LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), 22L); + LimitableBulkFormat.create( + new StreamFormatAdapter<>(new TextLineInputFormat()), 22L); BulkFormat.Reader<String> reader = format.createReader( @@ -88,7 +89,8 @@ public class LimitableBulkFormatTest { // read BulkFormat<String, FileSourceSplit> format = - LimitableBulkFormat.create(new StreamFormatAdapter<>(new TextLineFormat()), limit); + LimitableBulkFormat.create( + new StreamFormatAdapter<>(new TextLineInputFormat()), limit); BulkFormat.Reader<String> reader = format.createReader(