This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b9aaca3c96a79c241568f4493f6b308be1eb4ce
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Wed Nov 10 12:19:38 2021 +0100

    [FLINK-24859][doc][formats] Make new formats name coherent
---
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../docs/connectors/datastream/hybridsource.md     |  2 +-
 .../connector/base/source/hybrid/HybridSource.java |  2 +-
 ...extLineFormat.java => TextLineInputFormat.java} |  8 ++---
 .../file/src/FileSourceTextLinesITCase.java        |  8 +++--
 .../file/src/impl/FileSourceReaderTest.java        |  4 +--
 .../flink/connectors/hive/HiveSourceBuilder.java   |  4 +--
 .../hive/read/HiveCompactReaderFactory.java        |  4 +--
 ...BulkFormatAdapter.java => HiveInputFormat.java} | 12 ++++----
 .../nohive/OrcNoHiveColumnarRowInputFormat.java    | 10 +++----
 ...tFormat.java => OrcColumnarRowInputFormat.java} | 10 +++----
 .../org/apache/flink/orc/OrcFileFormatFactory.java |  2 +-
 ...est.java => OrcColumnarRowInputFormatTest.java} | 34 +++++++++++-----------
 flink-python/pyflink/datastream/connectors.py      |  2 +-
 .../table/filesystem/LimitableBulkFormatTest.java  |  8 +++--
 15 files changed, 58 insertions(+), 54 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/hybridsource.md 
b/docs/content.zh/docs/connectors/datastream/hybridsource.md
index 02f5077..7058ac4 100644
--- a/docs/content.zh/docs/connectors/datastream/hybridsource.md
+++ b/docs/content.zh/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the 
contained sources ca
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
-  FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir)).build();
+  FileSource.forRecordStreamFormat(new TextLineInputFormat(), 
Path.fromLocalFile(testDir)).build();
 KafkaSource<String> kafkaSource =
           KafkaSource.<String>builder()
                   
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git a/docs/content/docs/connectors/datastream/hybridsource.md 
b/docs/content/docs/connectors/datastream/hybridsource.md
index 02f5077..7058ac4 100644
--- a/docs/content/docs/connectors/datastream/hybridsource.md
+++ b/docs/content/docs/connectors/datastream/hybridsource.md
@@ -58,7 +58,7 @@ Each source covers an upfront known range and therefore the 
contained sources ca
 ```java
 long switchTimestamp = ...; // derive from file input paths
 FileSource<String> fileSource =
-  FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir)).build();
+  FileSource.forRecordStreamFormat(new TextLineInputFormat(), 
Path.fromLocalFile(testDir)).build();
 KafkaSource<String> kafkaSource =
           KafkaSource.<String>builder()
                   
.setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
index 24acb6a..8df875b 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
@@ -41,7 +41,7 @@ import java.util.List;
  *
  * <pre>{@code
  * FileSource<String> fileSource =
- *   FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir)).build();
+ *   FileSource.forRecordStreamFormat(new TextLineInputFormat(), 
Path.fromLocalFile(testDir)).build();
  * KafkaSource<String> kafkaSource =
  *           KafkaSource.<String>builder()
  *                   .setBootstrapServers("localhost:9092")
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
similarity index 93%
rename from 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
rename to 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
index c6fa3b1..00d906d 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineInputFormat.java
@@ -42,7 +42,7 @@ import java.io.InputStreamReader;
  * with their internal buffering of stream input and charset decoder state.
  */
 @PublicEvolving
-public class TextLineFormat extends SimpleStreamFormat<String> {
+public class TextLineInputFormat extends SimpleStreamFormat<String> {
 
     private static final long serialVersionUID = 1L;
 
@@ -50,11 +50,11 @@ public class TextLineFormat extends 
SimpleStreamFormat<String> {
 
     private final String charsetName;
 
-    public TextLineFormat() {
+    public TextLineInputFormat() {
         this(DEFAULT_CHARSET_NAME);
     }
 
-    public TextLineFormat(String charsetName) {
+    public TextLineInputFormat(String charsetName) {
         this.charsetName = charsetName;
     }
 
@@ -72,7 +72,7 @@ public class TextLineFormat extends 
SimpleStreamFormat<String> {
 
     // ------------------------------------------------------------------------
 
-    /** The actual reader for the {@code TextLineFormat}. */
+    /** The actual reader for the {@code TextLineInputFormat}. */
     public static final class Reader implements StreamFormat.Reader<String> {
 
         private final BufferedReader reader;
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index eea0fa7..5f2a851 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.file.src;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -119,7 +119,8 @@ public class FileSourceTextLinesITCase extends TestLogger {
         writeHiddenJunkFiles(testDir);
 
         final FileSource<String> source =
-                FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
+                FileSource.forRecordStreamFormat(
+                                new TextLineInputFormat(), 
Path.fromLocalFile(testDir))
                         .build();
 
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -184,7 +185,8 @@ public class FileSourceTextLinesITCase extends TestLogger {
         final File testDir = TMP_FOLDER.newFolder();
 
         final FileSource<String> source =
-                FileSource.forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
+                FileSource.forRecordStreamFormat(
+                                new TextLineInputFormat(), 
Path.fromLocalFile(testDir))
                         .monitorContinuously(Duration.ofMillis(5))
                         .build();
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
index a25a14c..d70745f 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileSourceReaderTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.file.src.impl;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.core.fs.Path;
 
@@ -64,7 +64,7 @@ public class FileSourceReaderTest {
     private static FileSourceReader<String, FileSourceSplit> createReader(
             TestingReaderContext context) {
         return new FileSourceReader<>(
-                context, new StreamFormatAdapter<>(new TextLineFormat()), new 
Configuration());
+                context, new StreamFormatAdapter<>(new TextLineInputFormat()), 
new Configuration());
     }
 
     private static FileSourceSplit createTestFileSplit() throws IOException {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
index 7b8442a..4b42073 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
@@ -25,8 +25,8 @@ import 
org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
 import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
 import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
-import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
 import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
+import org.apache.flink.connectors.hive.read.HiveInputFormat;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
 import org.apache.flink.connectors.hive.util.HiveConfUtils;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
@@ -310,7 +310,7 @@ public class HiveSourceBuilder {
 
     private BulkFormat<RowData, HiveSourceSplit> createDefaultBulkFormat() {
         return LimitableBulkFormat.create(
-                new HiveBulkFormatAdapter(
+                new HiveInputFormat(
                         new JobConfWrapper(jobConf),
                         partitionKeys,
                         fullSchema.getFieldNames(),
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
index 012ef01..47b36d9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveCompactReaderFactory.java
@@ -88,8 +88,8 @@ public class HiveCompactReaderFactory implements 
CompactReader.Factory<RowData>
     @Override
     public CompactReader<RowData> create(CompactContext context) throws 
IOException {
         HiveSourceSplit split = createSplit(context.getPath(), 
context.getFileSystem());
-        HiveBulkFormatAdapter format =
-                new HiveBulkFormatAdapter(
+        HiveInputFormat format =
+                new HiveInputFormat(
                         jobConfWrapper,
                         partitionKeys,
                         fieldNames,
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
similarity index 97%
rename from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
rename to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
index 99aad05..6ab0dc9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveInputFormat.java
@@ -27,7 +27,7 @@ import org.apache.flink.connectors.hive.JobConfWrapper;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
 import org.apache.flink.connectors.hive.util.JobConfUtils;
 import org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat;
-import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.OrcColumnarRowInputFormat;
 import org.apache.flink.orc.nohive.OrcNoHiveColumnarRowInputFormat;
 import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
@@ -62,11 +62,11 @@ import static 
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_S
  * BulkFormat instances, because different hive partitions may need different 
BulkFormat to do the
  * reading.
  */
-public class HiveBulkFormatAdapter implements BulkFormat<RowData, 
HiveSourceSplit> {
+public class HiveInputFormat implements BulkFormat<RowData, HiveSourceSplit> {
 
     private static final long serialVersionUID = 1L;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(HiveBulkFormatAdapter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveInputFormat.class);
 
     // schema evolution configs are not available in older versions of 
IOConstants, let's define
     // them ourselves
@@ -83,7 +83,7 @@ public class HiveBulkFormatAdapter implements 
BulkFormat<RowData, HiveSourceSpli
     private final boolean useMapRedReader;
     private final PartitionFieldExtractor<HiveSourceSplit> 
partitionFieldExtractor;
 
-    public HiveBulkFormatAdapter(
+    public HiveInputFormat(
             JobConfWrapper jobConfWrapper,
             List<String> partitionKeys,
             String[] fieldNames,
@@ -162,7 +162,7 @@ public class HiveBulkFormatAdapter implements 
BulkFormat<RowData, HiveSourceSpli
         }
     }
 
-    private OrcColumnarRowFileInputFormat<?, HiveSourceSplit> 
createOrcFormat() {
+    private OrcColumnarRowInputFormat<?, HiveSourceSplit> createOrcFormat() {
         return hiveVersion.startsWith("1.")
                 ? OrcNoHiveColumnarRowInputFormat.createPartitionedFormat(
                         jobConfWrapper.conf(),
@@ -172,7 +172,7 @@ public class HiveBulkFormatAdapter implements 
BulkFormat<RowData, HiveSourceSpli
                         computeSelectedFields(),
                         Collections.emptyList(),
                         DEFAULT_SIZE)
-                : OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                : OrcColumnarRowInputFormat.createPartitionedFormat(
                         OrcShim.createShim(hiveVersion),
                         jobConfWrapper.conf(),
                         tableRowType(),
diff --git 
a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
 
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
index 47af6ad..b186faf 100644
--- 
a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
+++ 
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/OrcNoHiveColumnarRowInputFormat.java
@@ -19,7 +19,7 @@
 package org.apache.flink.orc.nohive;
 
 import org.apache.flink.connector.file.src.FileSourceSplit;
-import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
+import org.apache.flink.orc.OrcColumnarRowInputFormat;
 import org.apache.flink.orc.OrcFilters;
 import org.apache.flink.orc.nohive.shim.OrcNoHiveShim;
 import org.apache.flink.orc.vector.ColumnBatchFactory;
@@ -42,16 +42,16 @@ import static 
org.apache.flink.orc.OrcSplitReaderUtil.getSelectedOrcFields;
 import static 
org.apache.flink.orc.nohive.vector.AbstractOrcNoHiveVector.createFlinkVector;
 import static 
org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVectorFromConstant;
 
-/** Helper class to create {@link OrcColumnarRowFileInputFormat} for no-hive. 
*/
+/** Helper class to create {@link OrcColumnarRowInputFormat} for no-hive. */
 public class OrcNoHiveColumnarRowInputFormat {
     private OrcNoHiveColumnarRowInputFormat() {}
 
     /**
-     * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the 
partition columns can be
+     * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition 
columns can be
      * generated by split.
      */
     public static <SplitT extends FileSourceSplit>
-            OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> 
createPartitionedFormat(
+            OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> 
createPartitionedFormat(
                     Configuration hadoopConfig,
                     RowType tableType,
                     List<String> partitionKeys,
@@ -84,7 +84,7 @@ public class OrcNoHiveColumnarRowInputFormat {
                     return new VectorizedColumnBatch(vectors);
                 };
 
-        return new OrcColumnarRowFileInputFormat<>(
+        return new OrcColumnarRowInputFormat<>(
                 new OrcNoHiveShim(),
                 hadoopConfig,
                 convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, 
partitionKeys),
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
similarity index 95%
rename from 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
rename to 
flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
index d88c967..bb12d51 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowFileInputFormat.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowInputFormat.java
@@ -55,7 +55,7 @@ import static 
org.apache.flink.orc.vector.AbstractOrcColumnVector.createFlinkVec
  * fields, which can be extracted from path. Therefore, the {@link 
#getProducedType()} may be
  * different and types of extra fields need to be added.
  */
-public class OrcColumnarRowFileInputFormat<BatchT, SplitT extends 
FileSourceSplit>
+public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
         extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
 
     private static final long serialVersionUID = 1L;
@@ -63,7 +63,7 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT 
extends FileSourceSpli
     private final ColumnBatchFactory<BatchT, SplitT> batchFactory;
     private final RowType projectedOutputType;
 
-    public OrcColumnarRowFileInputFormat(
+    public OrcColumnarRowInputFormat(
             final OrcShim<BatchT> shim,
             final Configuration hadoopConfig,
             final TypeDescription schema,
@@ -126,11 +126,11 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT 
extends FileSourceSpli
     }
 
     /**
-     * Create a partitioned {@link OrcColumnarRowFileInputFormat}, the 
partition columns can be
+     * Create a partitioned {@link OrcColumnarRowInputFormat}, the partition 
columns can be
      * generated by split.
      */
     public static <SplitT extends FileSourceSplit>
-            OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> 
createPartitionedFormat(
+            OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> 
createPartitionedFormat(
                     OrcShim<VectorizedRowBatch> shim,
                     Configuration hadoopConfig,
                     RowType tableType,
@@ -164,7 +164,7 @@ public class OrcColumnarRowFileInputFormat<BatchT, SplitT 
extends FileSourceSpli
                     return new VectorizedColumnBatch(vectors);
                 };
 
-        return new OrcColumnarRowFileInputFormat<>(
+        return new OrcColumnarRowInputFormat<>(
                 shim,
                 hadoopConfig,
                 convertToOrcTypeWithPart(tableFieldNames, tableFieldTypes, 
partitionKeys),
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
index 7343959..853b3b8 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcFileFormatFactory.java
@@ -129,7 +129,7 @@ public class OrcFileFormatFactory implements 
BulkReaderFormatFactory, BulkWriter
                                         
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
                                                 .defaultValue());
 
-                return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+                return OrcColumnarRowInputFormat.createPartitionedFormat(
                         OrcShim.defaultShim(),
                         conf,
                         tableType,
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
similarity index 93%
rename from 
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
rename to 
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
index 2243d5c..5094866 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowFileInputFormatTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
@@ -62,8 +62,8 @@ import static 
org.apache.flink.table.utils.PartitionPathUtils.generatePartitionP
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-/** Test for {@link OrcColumnarRowFileInputFormat}. */
-public class OrcColumnarRowFileInputFormatTest {
+/** Test for {@link OrcColumnarRowInputFormat}. */
+public class OrcColumnarRowInputFormatTest {
 
     /** Small batch size for test more boundary conditions. */
     protected static final int BATCH_SIZE = 9;
@@ -97,7 +97,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadFileInSplits() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -124,7 +124,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadFileWithSelectFields() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {2, 0, 1});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -153,7 +153,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadDecimalTypeFile() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(DECIMAL_FILE_TYPE, new int[] {0});
 
         AtomicInteger cnt = new AtomicInteger(0);
@@ -217,7 +217,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
         int[] projectedFields = {8, 1, 3, 0, 5, 2};
 
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createPartitionFormat(
                         tableType, new ArrayList<>(partSpec.keySet()), 
projectedFields);
 
@@ -257,7 +257,7 @@ public class OrcColumnarRowFileInputFormatTest {
 
     @Test
     public void testReadFileAndRestore() throws IOException {
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1});
 
         // pick a middle split
@@ -276,7 +276,7 @@ public class OrcColumnarRowFileInputFormatTest {
                                 new Between("_col0", PredicateLeaf.Type.LONG, 
0L, 975000L),
                                 new Equals("_col0", PredicateLeaf.Type.LONG, 
980001L),
                                 new Between("_col0", PredicateLeaf.Type.LONG, 
990000L, 1800000L)));
-        OrcColumnarRowFileInputFormat<?, FileSourceSplit> format =
+        OrcColumnarRowInputFormat<?, FileSourceSplit> format =
                 createFormat(FLAT_FILE_TYPE, new int[] {0, 1}, filter);
 
         // pick a middle split
@@ -290,7 +290,7 @@ public class OrcColumnarRowFileInputFormatTest {
     }
 
     private void innerTestRestore(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             int breakCnt,
             int expectedCnt,
@@ -338,14 +338,14 @@ public class OrcColumnarRowFileInputFormatTest {
         assertEquals(expectedTotalF0, totalF0.get());
     }
 
-    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
+    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
             RowType formatType, int[] selectedFields) {
         return createFormat(formatType, selectedFields, new ArrayList<>());
     }
 
-    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> createFormat(
+    protected OrcColumnarRowInputFormat<?, FileSourceSplit> createFormat(
             RowType formatType, int[] selectedFields, List<Predicate> 
conjunctPredicates) {
-        return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+        return OrcColumnarRowInputFormat.createPartitionedFormat(
                 OrcShim.defaultShim(),
                 new Configuration(),
                 formatType,
@@ -356,9 +356,9 @@ public class OrcColumnarRowFileInputFormatTest {
                 BATCH_SIZE);
     }
 
-    protected OrcColumnarRowFileInputFormat<?, FileSourceSplit> 
createPartitionFormat(
+    protected OrcColumnarRowInputFormat<?, FileSourceSplit> 
createPartitionFormat(
             RowType tableType, List<String> partitionKeys, int[] 
selectedFields) {
-        return OrcColumnarRowFileInputFormat.createPartitionedFormat(
+        return OrcColumnarRowInputFormat.createPartitionedFormat(
                 OrcShim.defaultShim(),
                 new Configuration(),
                 tableType,
@@ -370,13 +370,13 @@ public class OrcColumnarRowFileInputFormatTest {
     }
 
     private BulkFormat.Reader<RowData> createReader(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format, 
FileSourceSplit split)
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format, 
FileSourceSplit split)
             throws IOException {
         return format.createReader(new 
org.apache.flink.configuration.Configuration(), split);
     }
 
     private BulkFormat.Reader<RowData> restoreReader(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             long offset,
             long recordSkipCount)
@@ -389,7 +389,7 @@ public class OrcColumnarRowFileInputFormatTest {
     }
 
     private void forEach(
-            OrcColumnarRowFileInputFormat<?, FileSourceSplit> format,
+            OrcColumnarRowInputFormat<?, FileSourceSplit> format,
             FileSourceSplit split,
             Consumer<RowData> action)
             throws IOException {
diff --git a/flink-python/pyflink/datastream/connectors.py 
b/flink-python/pyflink/datastream/connectors.py
index b4b3335..035cd62 100644
--- a/flink-python/pyflink/datastream/connectors.py
+++ b/flink-python/pyflink/datastream/connectors.py
@@ -757,7 +757,7 @@ class StreamFormat(object):
         :param charset_name: The charset to decode the byte stream.
         """
         j_stream_format = 
get_gateway().jvm.org.apache.flink.connector.file.src.reader. \
-            TextLineFormat(charset_name)
+            TextLineInputFormat(charset_name)
         return StreamFormat(j_stream_format)
 
 
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
index 92a1d22..471af36 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/LimitableBulkFormatTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.connector.file.src.util.Utils;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.FileUtils;
@@ -56,7 +56,8 @@ public class LimitableBulkFormatTest {
 
         // read
         BulkFormat<String, FileSourceSplit> format =
-                LimitableBulkFormat.create(new StreamFormatAdapter<>(new 
TextLineFormat()), 22L);
+                LimitableBulkFormat.create(
+                        new StreamFormatAdapter<>(new TextLineInputFormat()), 
22L);
 
         BulkFormat.Reader<String> reader =
                 format.createReader(
@@ -88,7 +89,8 @@ public class LimitableBulkFormatTest {
 
         // read
         BulkFormat<String, FileSourceSplit> format =
-                LimitableBulkFormat.create(new StreamFormatAdapter<>(new 
TextLineFormat()), limit);
+                LimitableBulkFormat.create(
+                        new StreamFormatAdapter<>(new TextLineInputFormat()), 
limit);
 
         BulkFormat.Reader<String> reader =
                 format.createReader(

Reply via email to