This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4c31fbb8ec [format] Introduce text format table (#6879)
4c31fbb8ec is described below
commit 4c31fbb8ecc71979bd7294b0ad09ab95c2cc6202
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 24 17:13:25 2025 +0800
[format] Introduce text format table (#6879)
---
docs/content/concepts/spec/fileformat.md | 27 ++++
.../main/java/org/apache/paimon/CoreOptions.java | 2 +
.../java/org/apache/paimon/table/FormatTable.java | 1 +
.../apache/paimon/format/csv/CsvFileReader.java | 4 +-
.../apache/paimon/format/csv/CsvFormatWriter.java | 4 +-
.../apache/paimon/format/json/JsonFileReader.java | 4 +-
.../paimon/format/json/JsonFormatWriter.java | 4 +-
...FileReader.java => AbstractTextFileReader.java} | 4 +-
...FileWriter.java => AbstractTextFileWriter.java} | 5 +-
.../apache/paimon/format/text/TextFileFormat.java | 122 +++++++++++++++++
.../paimon/format/text/TextFileFormatFactory.java | 38 ++++++
.../apache/paimon/format/text/TextFileReader.java | 144 +++------------------
.../paimon/format/text/TextFormatWriter.java | 49 +++++++
.../org/apache/paimon/format/text/TextOptions.java | 44 +++++++
.../org.apache.paimon.format.FileFormatFactory | 1 +
.../paimon/format/text/TextFileReaderTest.java | 2 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 3 +
.../org/apache/paimon/hive/HiveTableUtils.java | 17 ++-
.../paimon/spark/catalog/FormatTableCatalog.java | 17 +++
.../spark/sql/execution/SparkFormatTable.scala | 34 ++++-
.../paimon/spark/sql/FormatTableTestBase.scala | 41 ++++++
21 files changed, 416 insertions(+), 151 deletions(-)
diff --git a/docs/content/concepts/spec/fileformat.md
b/docs/content/concepts/spec/fileformat.md
index bd06d99b7b..b520ede409 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -506,6 +506,33 @@ The following table lists the type mapping from Paimon
type to CSV type.
</tbody>
</table>
+## TEXT
+
+Experimental feature, not recommended for production.
+
+Format Options:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>text.line-delimiter</h5></td>
+ <td style="word-wrap: break-word;"><code>\n</code></td>
+ <td>String</td>
+ <td>The line delimiter for TEXT format</td>
+ </tr>
+ </tbody>
+</table>
+
+The Paimon text table contains only one field, and it is of string type.
+
## JSON
Experimental feature, not recommended for production.
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 6ec0dff011..ae48355836 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -230,6 +230,7 @@ public class CoreOptions implements Serializable {
public static final String FILE_FORMAT_AVRO = "avro";
public static final String FILE_FORMAT_PARQUET = "parquet";
public static final String FILE_FORMAT_CSV = "csv";
+ public static final String FILE_FORMAT_TEXT = "text";
public static final String FILE_FORMAT_JSON = "json";
public static final ConfigOption<String> FILE_FORMAT =
@@ -2379,6 +2380,7 @@ public class CoreOptions implements Serializable {
case FILE_FORMAT_ORC:
return "zstd";
case FILE_FORMAT_CSV:
+ case FILE_FORMAT_TEXT:
case FILE_FORMAT_JSON:
return "none";
default:
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index 1e481fee45..af6dd9ca26 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -76,6 +76,7 @@ public interface FormatTable extends Table {
ORC,
PARQUET,
CSV,
+ TEXT,
JSON
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 2c7477cc0d..5a8f9ac81f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -19,7 +19,7 @@
package org.apache.paimon.format.csv;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileReader;
+import org.apache.paimon.format.text.AbstractTextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
/** CSV file reader implementation. */
-public class CsvFileReader extends TextFileReader {
+public class CsvFileReader extends AbstractTextFileReader {
private final boolean includeHeader;
private final CsvParser csvParser;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index 81d81b874f..2004eece23 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.csv;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileWriter;
+import org.apache.paimon.format.text.AbstractTextFileWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** CSV format writer implementation. */
-public class CsvFormatWriter extends TextFileWriter {
+public class CsvFormatWriter extends AbstractTextFileWriter {
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
// Performance optimization: Cache frequently used cast executors
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
index b10dd8f689..7f9b7a1545 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -25,7 +25,7 @@ import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileReader;
+import org.apache.paimon.format.text.AbstractTextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.ArrayType;
@@ -49,7 +49,7 @@ import java.util.List;
import java.util.Map;
/** JSON file reader. */
-public class JsonFileReader extends TextFileReader {
+public class JsonFileReader extends AbstractTextFileReader {
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
index 87580b5c87..01a55e9ae7 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.text.TextFileWriter;
+import org.apache.paimon.format.text.AbstractTextFileWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
@@ -45,7 +45,7 @@ import java.util.List;
import java.util.Map;
/** Json format writer implementation. */
-public class JsonFormatWriter extends TextFileWriter {
+public class JsonFormatWriter extends AbstractTextFileWriter {
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileReader.java
similarity index 97%
copy from
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
copy to
paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileReader.java
index d9cbd0ba0e..1b9e90bc07 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileReader.java
@@ -33,7 +33,7 @@ import java.io.InputStream;
import static
org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
/** Base class for text-based file readers that provides common functionality.
*/
-public abstract class TextFileReader implements FileRecordReader<InternalRow> {
+public abstract class AbstractTextFileReader implements
FileRecordReader<InternalRow> {
private final Path filePath;
private final TextRecordIterator reader;
@@ -44,7 +44,7 @@ public abstract class TextFileReader implements
FileRecordReader<InternalRow> {
protected boolean readerClosed = false;
- protected TextFileReader(
+ protected AbstractTextFileReader(
FileIO fileIO,
Path filePath,
RowType rowType,
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java
similarity index 94%
rename from
paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
rename to
paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java
index 8cf56da017..58df918507 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/AbstractTextFileWriter.java
@@ -31,13 +31,14 @@ import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
/** Base class for text-based format writers that provides common
functionality. */
-public abstract class TextFileWriter implements FormatWriter {
+public abstract class AbstractTextFileWriter implements FormatWriter {
protected final PositionOutputStream outputStream;
protected final BufferedWriter writer;
protected final RowType rowType;
- protected TextFileWriter(PositionOutputStream outputStream, RowType
rowType, String compression)
+ protected AbstractTextFileWriter(
+ PositionOutputStream outputStream, RowType rowType, String
compression)
throws IOException {
this.outputStream = outputStream;
OutputStream compressedStream =
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormat.java
new file mode 100644
index 0000000000..dde9b5915e
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.CloseShieldOutputStream;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+/** TEXT {@link FileFormat}. */
+public class TextFileFormat extends FileFormat {
+
+ public static final String TEXT_IDENTIFIER = "text";
+
+ private final TextOptions options;
+
+ public TextFileFormat(FileFormatFactory.FormatContext context) {
+ super(TEXT_IDENTIFIER);
+ this.options = new TextOptions(context.options());
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType dataSchemaRowType,
+ RowType projectedRowType,
+ @Nullable List<Predicate> filters) {
+ // For text format, since it only has one string column, the
projectedRowType will typically
+ // be a RowType with a single string field or empty (for example, when
count *).
+ return new TextReaderFactory(projectedRowType, options);
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ return new TextWriterFactory(type, options);
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ if (rowType.getFieldCount() != 1
+ && !rowType.getFieldTypes().get(0).equals(DataTypes.STRING()))
{
+ throw new IllegalArgumentException("Text format only supports a
single string column");
+ }
+ }
+
+ /** TEXT {@link FormatReaderFactory} implementation. */
+ private static class TextReaderFactory implements FormatReaderFactory {
+
+ private final RowType projectedRowType;
+ private final TextOptions options;
+
+ public TextReaderFactory(RowType projectedRowType, TextOptions
options) {
+ this.projectedRowType = projectedRowType;
+ this.options = options;
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context)
throws IOException {
+ return new TextFileReader(
+ context.fileIO(), context.filePath(), projectedRowType,
options, 0, null);
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context,
long offset, long length)
+ throws IOException {
+ return new TextFileReader(
+ context.fileIO(),
+ context.filePath(),
+ projectedRowType,
+ options,
+ offset,
+ length);
+ }
+ }
+
+ /** A {@link FormatWriterFactory} to write {@link InternalRow} to TEXT. */
+ private static class TextWriterFactory implements FormatWriterFactory {
+
+ private final RowType rowType;
+ private final TextOptions options;
+
+ public TextWriterFactory(RowType rowType, TextOptions options) {
+ this.rowType = rowType;
+ this.options = options;
+ }
+
+ @Override
+ public FormatWriter create(PositionOutputStream out, String
compression)
+ throws IOException {
+ return new TextFormatWriter(
+ new CloseShieldOutputStream(out), rowType, options,
compression);
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormatFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormatFactory.java
new file mode 100644
index 0000000000..a59ed2a96c
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link TextFileFormat}. */
+public class TextFileFormatFactory implements FileFormatFactory {
+
+ public static final String IDENTIFIER = "text";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new TextFileFormat(formatContext);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
index d9cbd0ba0e..9b8e65461f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFileReader.java
@@ -18,156 +18,42 @@
package org.apache.paimon.format.text;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.io.InputStream;
-import static
org.apache.paimon.format.text.HadoopCompressionUtils.createDecompressedInputStream;
+/** TEXT format reader implementation. */
+public class TextFileReader extends AbstractTextFileReader {
-/** Base class for text-based file readers that provides common functionality.
*/
-public abstract class TextFileReader implements FileRecordReader<InternalRow> {
+ private static final GenericRow emptyRow = new GenericRow(0);
- private final Path filePath;
- private final TextRecordIterator reader;
+ private final boolean isEmptyRow;
- protected final RowType rowType;
- protected final long offset;
- protected final TextLineReader lineReader;
-
- protected boolean readerClosed = false;
-
- protected TextFileReader(
+ public TextFileReader(
FileIO fileIO,
Path filePath,
- RowType rowType,
- String delimiter,
+ RowType projectedRowType,
+ TextOptions options,
long offset,
@Nullable Long length)
throws IOException {
- this.filePath = filePath;
- this.rowType = rowType;
- this.offset = offset;
- InputStream decompressedStream =
- createDecompressedInputStream(fileIO.newInputStream(filePath),
filePath);
- this.lineReader = TextLineReader.create(decompressedStream, delimiter,
offset, length);
- this.reader = new TextRecordIterator();
- }
-
- /**
- * Parses a single line of text into an InternalRow. Subclasses must
implement this method to
- * handle their specific format.
- */
- @Nullable
- protected abstract InternalRow parseLine(String line) throws IOException;
-
- /**
- * Performs any additional setup before reading records. Subclasses can
override this method if
- * they need to perform setup operations like skipping headers.
- */
- protected void setupReading() throws IOException {
- // Default implementation does nothing
+ super(fileIO, filePath, projectedRowType, options.lineDelimiter(),
offset, length);
+ this.isEmptyRow = projectedRowType.getFieldCount() == 0;
}
@Override
- @Nullable
- public FileRecordIterator<InternalRow> readBatch() throws IOException {
- if (readerClosed) {
- return null;
+ protected @Nullable InternalRow parseLine(String line) {
+ if (isEmptyRow) {
+ return emptyRow;
}
- // Perform any setup needed before reading
- setupReading();
-
- if (reader.end) {
- return null;
- }
- return reader;
- }
-
- @Override
- public void close() throws IOException {
- if (!readerClosed) {
- if (lineReader != null) {
- lineReader.close();
- }
- readerClosed = true;
- }
- }
-
- /** Record iterator for text-based file readers. */
- private class TextRecordIterator implements
FileRecordIterator<InternalRow> {
-
- protected long currentPosition = 0;
- protected boolean end = false;
-
- @Override
- public InternalRow next() throws IOException {
- while (true) {
- if (readerClosed) {
- return null;
- }
- String nextLine = readLine();
- if (nextLine == null) {
- end = true;
- return null;
- }
-
- currentPosition++;
- InternalRow row = parseLine(nextLine);
- if (row != null) {
- return row;
- }
- }
- }
-
- @Override
- public void releaseBatch() {
- // Default implementation does nothing
- }
-
- @Override
- public Path filePath() {
- return filePath;
- }
-
- @Override
- public long returnedPosition() {
- if (offset > 0) {
- throw new UnsupportedOperationException(
- "Cannot return position with reading offset.");
- }
- return Math.max(0, currentPosition - 1);
- }
- }
-
- /**
- * Reads a single line from the input stream, using either the default
line delimiter or a
- * custom delimiter.
- *
- * <p>This method supports multi-character custom delimiters by using a
simple pattern matching
- * algorithm. For standard delimiters (null or empty), it delegates to
BufferedReader's
- * readLine() for optimal performance.
- *
- * <p>The algorithm maintains a partial match index and accumulates bytes
until:
- *
- * <ul>
- * <li>A complete delimiter is found (returns line without delimiter)
- * <li>End of stream is reached (returns accumulated data or null if
empty)
- * <li>Maximum line length is exceeded (throws IOException)
- * </ul>
- *
- * @return the next line as a string (without delimiter), or null if end
of stream
- * @throws IOException if an I/O error occurs or line exceeds maximum
length
- */
- protected String readLine() throws IOException {
- return lineReader.readLine();
+ return GenericRow.of(BinaryString.fromString(line));
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFormatWriter.java
new file mode 100644
index 0000000000..b2d68b766a
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextFormatWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+
+/** TEXT format writer implementation. */
+public class TextFormatWriter extends AbstractTextFileWriter {
+
+ private final TextOptions textOptions;
+
+ protected TextFormatWriter(
+ PositionOutputStream outputStream,
+ RowType rowType,
+ TextOptions textOptions,
+ String compression)
+ throws IOException {
+ super(outputStream, rowType, compression);
+ this.textOptions = textOptions;
+ }
+
+ @Override
+ public void addElement(InternalRow element) throws IOException {
+ if (!element.isNullAt(0)) {
+ writer.write(element.getString(0).toString());
+ }
+ writer.write(textOptions.lineDelimiter());
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/TextOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/TextOptions.java
new file mode 100644
index 0000000000..16fa8f92b6
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/text/TextOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for text format. */
+public class TextOptions {
+
+ public static final ConfigOption<String> LINE_DELIMITER =
+ ConfigOptions.key("text.line-delimiter")
+ .stringType()
+ .defaultValue("\n")
+ .withFallbackKeys("lineSep")
+ .withDescription("The line delimiter for TEXT format");
+
+ private final String lineDelimiter;
+
+ public TextOptions(Options options) {
+ this.lineDelimiter = options.get(LINE_DELIMITER);
+ }
+
+ public String lineDelimiter() {
+ return lineDelimiter;
+ }
+}
diff --git
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index 09a4216e34..80cfe4b946 100644
---
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -17,5 +17,6 @@ org.apache.paimon.format.avro.AvroFileFormatFactory
org.apache.paimon.format.orc.OrcFileFormatFactory
org.apache.paimon.format.parquet.ParquetFileFormatFactory
org.apache.paimon.format.csv.CsvFileFormatFactory
+org.apache.paimon.format.text.TextFileFormatFactory
org.apache.paimon.format.json.JsonFileFormatFactory
org.apache.paimon.format.blob.BlobFileFormatFactory
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
index c4ca1bf0b5..d78ff1b3f0 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/text/TextFileReaderTest.java
@@ -199,7 +199,7 @@ public class TextFileReaderTest {
}
/** Concrete implementation of BaseTextFileReader for testing. */
- private static class TestTextFileReader extends TextFileReader {
+ private static class TestTextFileReader extends AbstractTextFileReader {
public TestTextFileReader(
FileIO fileIO, Path filePath, RowType rowType, String
recordDelimiter)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 6f2c14a297..50fc949835 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1468,6 +1468,7 @@ public class HiveCatalog extends AbstractCatalog {
}
switch (provider) {
case CSV:
+ case TEXT:
return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
case PARQUET:
return
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
@@ -1485,6 +1486,7 @@ public class HiveCatalog extends AbstractCatalog {
}
switch (provider) {
case CSV:
+ case TEXT:
case JSON:
return "org.apache.hadoop.mapred.TextInputFormat";
case PARQUET:
@@ -1501,6 +1503,7 @@ public class HiveCatalog extends AbstractCatalog {
}
switch (provider) {
case CSV:
+ case TEXT:
case JSON:
return
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
case PARQUET:
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index 34975442e3..d352b148b6 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -39,6 +39,7 @@ import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
import static org.apache.paimon.format.csv.CsvOptions.FIELD_DELIMITER;
import static org.apache.paimon.hive.HiveCatalog.HIVE_FIELD_DELIM_DEFAULT;
+import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.apache.paimon.hive.HiveCatalog.isView;
class HiveTableUtils {
@@ -76,12 +77,16 @@ class HiveTableUtils {
if (serLib.contains("json")) {
format = Format.JSON;
} else {
- format = Format.CSV;
- options.set(
- FIELD_DELIMITER,
- serdeInfo
- .getParameters()
- .getOrDefault(FIELD_DELIM,
HIVE_FIELD_DELIM_DEFAULT));
+ if ("TEXT".equals(options.get(TABLE_TYPE_PROP))) {
+ format = Format.TEXT;
+ } else {
+ format = Format.CSV;
+ options.set(
+ FIELD_DELIMITER,
+ serdeInfo
+ .getParameters()
+ .getOrDefault(FIELD_DELIM,
HIVE_FIELD_DELIM_DEFAULT));
+ }
}
} else {
throw new UnsupportedOperationException("Unsupported table: " +
hiveTable);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
index 2da1b1896c..3893e67068 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/FormatTableCatalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.catalog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.format.csv.CsvOptions;
+import org.apache.paimon.format.text.TextOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.spark.SparkSource;
import org.apache.paimon.spark.SparkTypeUtils;
@@ -35,10 +36,12 @@ import org.apache.spark.sql.execution.PartitionedCSVTable;
import org.apache.spark.sql.execution.PartitionedJsonTable;
import org.apache.spark.sql.execution.PartitionedOrcTable;
import org.apache.spark.sql.execution.PartitionedParquetTable;
+import org.apache.spark.sql.execution.PartitionedTextTable;
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat;
import org.apache.spark.sql.execution.datasources.v2.FileTable;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -104,6 +107,20 @@ public interface FormatTableCatalog {
scala.Option.apply(schema),
CSVFileFormat.class,
partitionSchema);
+ } else if (formatTable.format() == FormatTable.Format.TEXT) {
+ options.set("lineSep", options.get(TextOptions.LINE_DELIMITER));
+ dsOptions = new CaseInsensitiveStringMap(options.toMap());
+ if (options.contains(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION)) {
+ options.set("compression",
options.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION));
+ }
+ return new PartitionedTextTable(
+ ident.name(),
+ spark,
+ dsOptions,
+
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
+ scala.Option.apply(schema),
+ TextFileFormat.class,
+ partitionSchema);
} else if (formatTable.format() == FormatTable.Format.ORC) {
return new PartitionedOrcTable(
ident.name(),
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 0a9c1fcdab..2cb0101653 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -31,6 +31,7 @@ import
org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, CSVTab
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
+import org.apache.spark.sql.execution.datasources.v2.text.{TextScanBuilder,
TextTable}
import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -190,10 +191,37 @@ class PartitionedCSVTable(
userSpecifiedSchema,
partitionSchema())
}
+}
+
+class PartitionedTextTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat],
+ override val partitionSchema_ : StructType)
+ extends TextTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
+ with PartitionedFormatTable {
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
TextScanBuilder = {
+ val mergedOptions =
+ this.options.asCaseSensitiveMap().asScala ++
options.asCaseSensitiveMap().asScala
+ TextScanBuilder(
+ sparkSession,
+ fileIndex,
+ schema,
+ dataSchema,
+ new CaseInsensitiveStringMap(mergedOptions.asJava))
+ }
- override def newWriteBuilder(info:
_root_.org.apache.spark.sql.connector.write.LogicalWriteInfo)
- : _root_.org.apache.spark.sql.connector.write.WriteBuilder = {
- super.newWriteBuilder(info)
+ override lazy val fileIndex: PartitioningAwareFileIndex = {
+ SparkFormatTable.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ userSpecifiedSchema,
+ partitionSchema())
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 9da753f299..cff8ccd946 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -238,4 +238,45 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
}
}
}
+
+ test("Paimon format table: text format") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE t (v STRING) USING text
+ |TBLPROPERTIES ('text.line-delimiter'='?')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES ('aaaa'), ('bbbb'), ('cccc'), (null),
('dddd')")
+
+ for (impl <- Seq("paimon", "engine")) {
+ withSparkSQLConf("spark.paimon.format-table.implementation" -> impl) {
+ checkAnswer(sql("SELECT COUNT(*) FROM t"), Row(5))
+ // Follow spark, write null as empty string and read as empty string
too.
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY v"),
+ Seq(Row(""), Row("aaaa"), Row("bbbb"), Row("cccc"), Row("dddd"))
+ )
+ }
+ }
+ }
+ }
+
+ test("Paimon format table: write text format with value with
line-delimiter") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE t (v STRING) USING text
+ |TBLPROPERTIES ('text.line-delimiter'='?')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES ('aa'), ('bb?cc')")
+
+ for (impl <- Seq("paimon", "engine")) {
+ withSparkSQLConf("spark.paimon.format-table.implementation" -> impl) {
+ checkAnswer(sql("SELECT COUNT(*) FROM t"), Row(3))
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY v"),
+ Seq(Row("aa"), Row("bb"), Row("cc"))
+ )
+ }
+ }
+ }
+ }
}