This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c31fbb8ec [format] Introduce text format table (#6879)
4c31fbb8ec is described below

commit 4c31fbb8ecc71979bd7294b0ad09ab95c2cc6202
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 24 17:13:25 2025 +0800

    [format] Introduce text format table (#6879)
---
 docs/content/concepts/spec/fileformat.md           |  27 ++++
 .../main/java/org/apache/paimon/CoreOptions.java   |   2 +
 .../java/org/apache/paimon/table/FormatTable.java  |   1 +
 .../apache/paimon/format/csv/CsvFileReader.java    |   4 +-
 .../apache/paimon/format/csv/CsvFormatWriter.java  |   4 +-
 .../apache/paimon/format/json/JsonFileReader.java  |   4 +-
 .../paimon/format/json/JsonFormatWriter.java       |   4 +-
 ...FileReader.java => AbstractTextFileReader.java} |   4 +-
 ...FileWriter.java => AbstractTextFileWriter.java} |   5 +-
 .../apache/paimon/format/text/TextFileFormat.java  | 122 +++++++++++++++++
 .../paimon/format/text/TextFileFormatFactory.java  |  38 ++++++
 .../apache/paimon/format/text/TextFileReader.java  | 144 +++------------------
 .../paimon/format/text/TextFormatWriter.java       |  49 +++++++
 .../org/apache/paimon/format/text/TextOptions.java |  44 +++++++
 .../org.apache.paimon.format.FileFormatFactory     |   1 +
 .../paimon/format/text/TextFileReaderTest.java     |   2 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   3 +
 .../org/apache/paimon/hive/HiveTableUtils.java     |  17 ++-
 .../paimon/spark/catalog/FormatTableCatalog.java   |  17 +++
 .../spark/sql/execution/SparkFormatTable.scala     |  34 ++++-
 .../paimon/spark/sql/FormatTableTestBase.scala     |  41 ++++++
 21 files changed, 416 insertions(+), 151 deletions(-)

diff --git a/docs/content/concepts/spec/fileformat.md 
b/docs/content/concepts/spec/fileformat.md
index bd06d99b7b..b520ede409 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -506,6 +506,33 @@ The following table lists the type mapping from Paimon 
type to CSV type.
     </tbody>
 </table>
 
+## TEXT
+
+Experimental feature, not recommended for production.
+
+Format Options:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>text.line-delimiter</h5></td>
+      <td style="word-wrap: break-word;"><code>\n</code></td>
+      <td>String</td>
+      <td>The line delimiter for TEXT format</td>
+    </tr>
+    </tbody>
+</table>
+
+The Paimon text table contains only one field, and it is of string type.
+
 ## JSON
 
 Experimental feature, not recommended for production.
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 6ec0dff011..ae48355836 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -230,6 +230,7 @@ public class CoreOptions implements Serializable {
     public static final String FILE_FORMAT_AVRO = "avro";
     public static final String FILE_FORMAT_PARQUET = "parquet";
     public static final String FILE_FORMAT_CSV = "csv";
+    public static final String FILE_FORMAT_TEXT = "text";
     public static final String FILE_FORMAT_JSON = "json";
 
     public static final ConfigOption<String> FILE_FORMAT =
@@ -2379,6 +2380,7 @@ public class CoreOptions implements Serializable {
                 case FILE_FORMAT_ORC:
                     return "zstd";
                 case FILE_FORMAT_CSV:
+                case FILE_FORMAT_TEXT:
                 case FILE_FORMAT_JSON:
                     return "none";
                 default:
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 1e481fee45..af6dd9ca26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -76,6 +76,7 @@ public interface FormatTable extends Table {
         ORC,
         PARQUET,
         CSV,
+        TEXT,
         JSON
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 2c7477cc0d..5a8f9ac81f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.format.csv;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileReader;
+import org.apache.paimon.format.text.AbstractTextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.RowType;
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /** CSV file reader implementation. */
-public class CsvFileReader extends TextFileReader {
+public class CsvFileReader extends AbstractTextFileReader {
 
     private final boolean includeHeader;
     private final CsvParser csvParser;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index 81d81b874f..2004eece23 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.csv;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileWriter;
+import org.apache.paimon.format.text.AbstractTextFileWriter;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
@@ -33,7 +33,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** CSV format writer implementation. */
-public class CsvFormatWriter extends TextFileWriter {
+public class CsvFormatWriter extends AbstractTextFileWriter {
 
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
     // Performance optimization: Cache frequently used cast executors
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index b10dd8f689..7f9b7a1545 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -25,7 +25,7 @@ import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileReader;
+import org.apache.paimon.format.text.AbstractTextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.types.ArrayType;
@@ -49,7 +49,7 @@ import java.util.List;
 import java.util.Map;
 
 /** JSON file reader. */
-public class JsonFileReader extends TextFileReader {
+public class JsonFileReader extends AbstractTextFileReader {
 
     private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index 87580b5c87..01a55e9ae7 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileWriter;
+import org.apache.paimon.format.text.AbstractTextFileWriter;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
@@ -45,7 +45,7 @@ import java.util.List;
 import java.util.Map;
 
 /** Json format writer implementation. */
-public class JsonFormatWriter extends TextFileWriter {
+public class JsonFormatWriter extends AbstractTextFileWriter {
 
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileReader.java
similarity index 97%
copy from 
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
copy to 
paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileReader.java
index d9cbd0ba0e..1b9e90bc07 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileReader.java
@@ -33,7 +33,7 @@ import java.io.InputStream;
 import static 
org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
 
 /** Base class for text-based file readers that provides common functionality. 
*/
-public abstract class TextFileReader implements FileRecordReader<InternalRow> {
+public abstract class AbstractTextFileReader implements 
FileRecordReader<InternalRow> {
 
     private final Path filePath;
     private final TextRecordIterator reader;
@@ -44,7 +44,7 @@ public abstract class TextFileReader implements 
FileRecordReader<InternalRow> {
 
     protected boolean readerClosed = false;
 
-    protected TextFileReader(
+    protected AbstractTextFileReader(
             FileIO fileIO,
             Path filePath,
             RowType rowType,
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java
similarity index 94%
rename from 
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
rename to 
paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java
index 8cf56da017..58df918507 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java
@@ -31,13 +31,14 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 
 /** Base class for text-based format writers that provides common 
functionality. */
-public abstract class TextFileWriter implements FormatWriter {
+public abstract class AbstractTextFileWriter implements FormatWriter {
 
     protected final PositionOutputStream outputStream;
     protected final BufferedWriter writer;
     protected final RowType rowType;
 
-    protected TextFileWriter(PositionOutputStream outputStream, RowType 
rowType, String compression)
+    protected AbstractTextFileWriter(
+            PositionOutputStream outputStream, RowType rowType, String 
compression)
             throws IOException {
         this.outputStream = outputStream;
         OutputStream compressedStream =
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormat.java
new file mode 100644
index 0000000000..dde9b5915e
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.CloseShieldOutputStream;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** TEXT {@link FileFormat}. */
+public class TextFileFormat extends FileFormat {
+
+    public static final String TEXT_IDENTIFIER = "text";
+
+    private final TextOptions options;
+
+    public TextFileFormat(FileFormatFactory.FormatContext context) {
+        super(TEXT_IDENTIFIER);
+        this.options = new TextOptions(context.options());
+    }
+
+    @Override
+    public FormatReaderFactory createReaderFactory(
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> filters) {
+        // For text format, since it only has one string column, the 
projectedRowType will typically
+        // be a RowType with a single string field or empty (for example, when 
count *).
+        return new TextReaderFactory(projectedRowType, options);
+    }
+
+    @Override
+    public FormatWriterFactory createWriterFactory(RowType type) {
+        return new TextWriterFactory(type, options);
+    }
+
+    @Override
+    public void validateDataFields(RowType rowType) {
+        if (rowType.getFieldCount() != 1
+                && !rowType.getFieldTypes().get(0).equals(DataTypes.STRING())) 
{
+            throw new IllegalArgumentException("Text format only supports a 
single string column");
+        }
+    }
+
+    /** TEXT {@link FormatReaderFactory} implementation. */
+    private static class TextReaderFactory implements FormatReaderFactory {
+
+        private final RowType projectedRowType;
+        private final TextOptions options;
+
+        public TextReaderFactory(RowType projectedRowType, TextOptions 
options) {
+            this.projectedRowType = projectedRowType;
+            this.options = options;
+        }
+
+        @Override
+        public FileRecordReader<InternalRow> createReader(Context context) 
throws IOException {
+            return new TextFileReader(
+                    context.fileIO(), context.filePath(), projectedRowType, 
options, 0, null);
+        }
+
+        @Override
+        public FileRecordReader<InternalRow> createReader(Context context, 
long offset, long length)
+                throws IOException {
+            return new TextFileReader(
+                    context.fileIO(),
+                    context.filePath(),
+                    projectedRowType,
+                    options,
+                    offset,
+                    length);
+        }
+    }
+
+    /** A {@link FormatWriterFactory} to write {@link InternalRow} to TEXT. */
+    private static class TextWriterFactory implements FormatWriterFactory {
+
+        private final RowType rowType;
+        private final TextOptions options;
+
+        public TextWriterFactory(RowType rowType, TextOptions options) {
+            this.rowType = rowType;
+            this.options = options;
+        }
+
+        @Override
+        public FormatWriter create(PositionOutputStream out, String 
compression)
+                throws IOException {
+            return new TextFormatWriter(
+                    new CloseShieldOutputStream(out), rowType, options, 
compression);
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormatFactory.java
new file mode 100644
index 0000000000..a59ed2a96c
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link TextFileFormat}. */
+public class TextFileFormatFactory implements FileFormatFactory {
+
+    public static final String IDENTIFIER = "text";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public FileFormat create(FormatContext formatContext) {
+        return new TextFileFormat(formatContext);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
index d9cbd0ba0e..9b8e65461f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
@@ -18,156 +18,42 @@
 
 package org.apache.paimon.format.text;
 
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.io.InputStream;
 
-import static 
org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
+/** TEXT format reader implementation. */
+public class TextFileReader extends AbstractTextFileReader {
 
-/** Base class for text-based file readers that provides common functionality. 
*/
-public abstract class TextFileReader implements FileRecordReader<InternalRow> {
+    private static final GenericRow emptyRow = new GenericRow(0);
 
-    private final Path filePath;
-    private final TextRecordIterator reader;
+    private final boolean isEmptyRow;
 
-    protected final RowType rowType;
-    protected final long offset;
-    protected final TextLineReader lineReader;
-
-    protected boolean readerClosed = false;
-
-    protected TextFileReader(
+    public TextFileReader(
             FileIO fileIO,
             Path filePath,
-            RowType rowType,
-            String delimiter,
+            RowType projectedRowType,
+            TextOptions options,
             long offset,
             @Nullable Long length)
             throws IOException {
-        this.filePath = filePath;
-        this.rowType = rowType;
-        this.offset = offset;
-        InputStream decompressedStream =
-                createDecompressedInputStream(fileIO.newInputStream(filePath), 
filePath);
-        this.lineReader = TextLineReader.create(decompressedStream, delimiter, 
offset, length);
-        this.reader = new TextRecordIterator();
-    }
-
-    /**
-     * Parses a single line of text into an InternalRow. Subclasses must 
implement this method to
-     * handle their specific format.
-     */
-    @Nullable
-    protected abstract InternalRow parseLine(String line) throws IOException;
-
-    /**
-     * Performs any additional setup before reading records. Subclasses can 
override this method if
-     * they need to perform setup operations like skipping headers.
-     */
-    protected void setupReading() throws IOException {
-        // Default implementation does nothing
+        super(fileIO, filePath, projectedRowType, options.lineDelimiter(), 
offset, length);
+        this.isEmptyRow = projectedRowType.getFieldCount() == 0;
     }
 
     @Override
-    @Nullable
-    public FileRecordIterator<InternalRow> readBatch() throws IOException {
-        if (readerClosed) {
-            return null;
+    protected @Nullable InternalRow parseLine(String line) {
+        if (isEmptyRow) {
+            return emptyRow;
         }
 
-        // Perform any setup needed before reading
-        setupReading();
-
-        if (reader.end) {
-            return null;
-        }
-        return reader;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (!readerClosed) {
-            if (lineReader != null) {
-                lineReader.close();
-            }
-            readerClosed = true;
-        }
-    }
-
-    /** Record iterator for text-based file readers. */
-    private class TextRecordIterator implements 
FileRecordIterator<InternalRow> {
-
-        protected long currentPosition = 0;
-        protected boolean end = false;
-
-        @Override
-        public InternalRow next() throws IOException {
-            while (true) {
-                if (readerClosed) {
-                    return null;
-                }
-                String nextLine = readLine();
-                if (nextLine == null) {
-                    end = true;
-                    return null;
-                }
-
-                currentPosition++;
-                InternalRow row = parseLine(nextLine);
-                if (row != null) {
-                    return row;
-                }
-            }
-        }
-
-        @Override
-        public void releaseBatch() {
-            // Default implementation does nothing
-        }
-
-        @Override
-        public Path filePath() {
-            return filePath;
-        }
-
-        @Override
-        public long returnedPosition() {
-            if (offset > 0) {
-                throw new UnsupportedOperationException(
-                        "Cannot return position with reading offset.");
-            }
-            return Math.max(0, currentPosition - 1);
-        }
-    }
-
-    /**
-     * Reads a single line from the input stream, using either the default 
line delimiter or a
-     * custom delimiter.
-     *
-     * <p>This method supports multi-character custom delimiters by using a 
simple pattern matching
-     * algorithm. For standard delimiters (null or empty), it delegates to 
BufferedReader's
-     * readLine() for optimal performance.
-     *
-     * <p>The algorithm maintains a partial match index and accumulates bytes 
until:
-     *
-     * <ul>
-     *   <li>A complete delimiter is found (returns line without delimiter)
-     *   <li>End of stream is reached (returns accumulated data or null if 
empty)
-     *   <li>Maximum line length is exceeded (throws IOException)
-     * </ul>
-     *
-     * @return the next line as a string (without delimiter), or null if end 
of stream
-     * @throws IOException if an I/O error occurs or line exceeds maximum 
length
-     */
-    protected String readLine() throws IOException {
-        return lineReader.readLine();
+        return GenericRow.of(BinaryString.fromString(line));
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFormatWriter.java
new file mode 100644
index 0000000000..b2d68b766a
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFormatWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+
+/** TEXT format writer implementation. */
+public class TextFormatWriter extends AbstractTextFileWriter {
+
+    private final TextOptions textOptions;
+
+    protected TextFormatWriter(
+            PositionOutputStream outputStream,
+            RowType rowType,
+            TextOptions textOptions,
+            String compression)
+            throws IOException {
+        super(outputStream, rowType, compression);
+        this.textOptions = textOptions;
+    }
+
+    @Override
+    public void addElement(InternalRow element) throws IOException {
+        if (!element.isNullAt(0)) {
+            writer.write(element.getString(0).toString());
+        }
+        writer.write(textOptions.lineDelimiter());
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextOptions.java
new file mode 100644
index 0000000000..16fa8f92b6
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/text/TextOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for text format. */
+public class TextOptions {
+
+    public static final ConfigOption<String> LINE_DELIMITER =
+            ConfigOptions.key("text.line-delimiter")
+                    .stringType()
+                    .defaultValue("\n")
+                    .withFallbackKeys("lineSep")
+                    .withDescription("The line delimiter for TEXT format");
+
+    private final String lineDelimiter;
+
+    public TextOptions(Options options) {
+        this.lineDelimiter = options.get(LINE_DELIMITER);
+    }
+
+    public String lineDelimiter() {
+        return lineDelimiter;
+    }
+}
diff --git 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index 09a4216e34..80cfe4b946 100644
--- 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++ 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -17,5 +17,6 @@ org.apache.paimon.format.avro.AvroFileFormatFactory
 org.apache.paimon.format.orc.OrcFileFormatFactory
 org.apache.paimon.format.parquet.ParquetFileFormatFactory
 org.apache.paimon.format.csv.CsvFileFormatFactory
+org.apache.paimon.format.text.TextFileFormatFactory
 org.apache.paimon.format.json.JsonFileFormatFactory
 org.apache.paimon.format.blob.BlobFileFormatFactory
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
index c4ca1bf0b5..d78ff1b3f0 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
@@ -199,7 +199,7 @@ public class TextFileReaderTest {
     }
 
     /** Concrete implementation of BaseTextFileReader for testing. */
-    private static class TestTextFileReader extends TextFileReader {
+    private static class TestTextFileReader extends AbstractTextFileReader {
 
         public TestTextFileReader(
                 FileIO fileIO, Path filePath, RowType rowType, String 
recordDelimiter)
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 6f2c14a297..50fc949835 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1468,6 +1468,7 @@ public class HiveCatalog extends AbstractCatalog {
         }
         switch (provider) {
             case CSV:
+            case TEXT:
                 return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
             case PARQUET:
                 return 
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
@@ -1485,6 +1486,7 @@ public class HiveCatalog extends AbstractCatalog {
         }
         switch (provider) {
             case CSV:
+            case TEXT:
             case JSON:
                 return "org.apache.hadoop.mapred.TextInputFormat";
             case PARQUET:
@@ -1501,6 +1503,7 @@ public class HiveCatalog extends AbstractCatalog {
         }
         switch (provider) {
             case CSV:
+            case TEXT:
             case JSON:
                 return 
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
             case PARQUET:
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index 34975442e3..d352b148b6 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -39,6 +39,7 @@ import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
 import static org.apache.paimon.format.csv.CsvOptions.FIELD_DELIMITER;
 import static org.apache.paimon.hive.HiveCatalog.HIVE_FIELD_DELIM_DEFAULT;
+import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
 import static org.apache.paimon.hive.HiveCatalog.isView;
 
 class HiveTableUtils {
@@ -76,12 +77,16 @@ class HiveTableUtils {
             if (serLib.contains("json")) {
                 format = Format.JSON;
             } else {
-                format = Format.CSV;
-                options.set(
-                        FIELD_DELIMITER,
-                        serdeInfo
-                                .getParameters()
-                                .getOrDefault(FIELD_DELIM, 
HIVE_FIELD_DELIM_DEFAULT));
+                if ("TEXT".equals(options.get(TABLE_TYPE_PROP))) {
+                    format = Format.TEXT;
+                } else {
+                    format = Format.CSV;
+                    options.set(
+                            FIELD_DELIMITER,
+                            serdeInfo
+                                    .getParameters()
+                                    .getOrDefault(FIELD_DELIM, 
HIVE_FIELD_DELIM_DEFAULT));
+                }
             }
         } else {
             throw new UnsupportedOperationException("Unsupported table: " + 
hiveTable);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
index 2da1b1896c..3893e67068 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.catalog;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.format.text.TextOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.spark.SparkSource;
 import org.apache.paimon.spark.SparkTypeUtils;
@@ -35,10 +36,12 @@ import org.apache.spark.sql.execution.PartitionedCSVTable;
 import org.apache.spark.sql.execution.PartitionedJsonTable;
 import org.apache.spark.sql.execution.PartitionedOrcTable;
 import org.apache.spark.sql.execution.PartitionedParquetTable;
+import org.apache.spark.sql.execution.PartitionedTextTable;
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat;
 import org.apache.spark.sql.execution.datasources.v2.FileTable;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -104,6 +107,20 @@ public interface FormatTableCatalog {
                     scala.Option.apply(schema),
                     CSVFileFormat.class,
                     partitionSchema);
+        } else if (formatTable.format() == FormatTable.Format.TEXT) {
+            options.set("lineSep", options.get(TextOptions.LINE_DELIMITER));
+            dsOptions = new CaseInsensitiveStringMap(options.toMap());
+            if (options.contains(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION)) {
+                options.set("compression", 
options.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION));
+            }
+            return new PartitionedTextTable(
+                    ident.name(),
+                    spark,
+                    dsOptions,
+                    
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+                    scala.Option.apply(schema),
+                    TextFileFormat.class,
+                    partitionSchema);
         } else if (formatTable.format() == FormatTable.Format.ORC) {
             return new PartitionedOrcTable(
                     ident.name(),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 0a9c1fcdab..2cb0101653 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, CSVTab
 import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
+import org.apache.spark.sql.execution.datasources.v2.text.{TextScanBuilder, 
TextTable}
 import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -190,10 +191,37 @@ class PartitionedCSVTable(
       userSpecifiedSchema,
       partitionSchema())
   }
+}
+
+class PartitionedTextTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat],
+    override val partitionSchema_ : StructType)
+  extends TextTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
+  with PartitionedFormatTable {
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): 
TextScanBuilder = {
+    val mergedOptions =
+      this.options.asCaseSensitiveMap().asScala ++ 
options.asCaseSensitiveMap().asScala
+    TextScanBuilder(
+      sparkSession,
+      fileIndex,
+      schema,
+      dataSchema,
+      new CaseInsensitiveStringMap(mergedOptions.asJava))
+  }
 
-  override def newWriteBuilder(info: 
_root_.org.apache.spark.sql.connector.write.LogicalWriteInfo)
-      : _root_.org.apache.spark.sql.connector.write.WriteBuilder = {
-    super.newWriteBuilder(info)
+  override lazy val fileIndex: PartitioningAwareFileIndex = {
+    SparkFormatTable.createFileIndex(
+      options,
+      sparkSession,
+      paths,
+      userSpecifiedSchema,
+      partitionSchema())
   }
 }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 9da753f299..cff8ccd946 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -238,4 +238,45 @@ abstract class FormatTableTestBase extends 
PaimonHiveTestBase {
       }
     }
   }
+
+  test("Paimon format table: text format") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE t (v STRING) USING text
+             |TBLPROPERTIES ('text.line-delimiter'='?')
+             |""".stripMargin)
+      sql("INSERT INTO t VALUES ('aaaa'), ('bbbb'), ('cccc'), (null), 
('dddd')")
+
+      for (impl <- Seq("paimon", "engine")) {
+        withSparkSQLConf("spark.paimon.format-table.implementation" -> impl) {
+          checkAnswer(sql("SELECT COUNT(*) FROM t"), Row(5))
+          // Follow spark, write null as empty string and read as empty string 
too.
+          checkAnswer(
+            sql("SELECT * FROM t ORDER BY v"),
+            Seq(Row(""), Row("aaaa"), Row("bbbb"), Row("cccc"), Row("dddd"))
+          )
+        }
+      }
+    }
+  }
+
+  test("Paimon format table: write text format with value with 
line-delimiter") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE t (v STRING) USING text
+             |TBLPROPERTIES ('text.line-delimiter'='?')
+             |""".stripMargin)
+      sql("INSERT INTO t VALUES ('aa'), ('bb?cc')")
+
+      for (impl <- Seq("paimon", "engine")) {
+        withSparkSQLConf("spark.paimon.format-table.implementation" -> impl) {
+          checkAnswer(sql("SELECT COUNT(*) FROM t"), Row(3))
+          checkAnswer(
+            sql("SELECT * FROM t ORDER BY v"),
+            Seq(Row("aa"), Row("bb"), Row("cc"))
+          )
+        }
+      }
+    }
+  }
 }


Reply via email to