This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 148296496a [core] update null-literal default value for csv format
(#6104)
148296496a is described below
commit 148296496a857887b44d1261af4e5da20db5a041
Author: jerry <[email protected]>
AuthorDate: Wed Aug 20 17:25:06 2025 +0800
[core] update null-literal default value for csv format (#6104)
---
docs/content/concepts/spec/fileformat.md | 2 +-
.../apache/paimon/format/csv/CsvFileFormat.java | 2 +
.../apache/paimon/format/csv/CsvFileReader.java | 5 +
.../apache/paimon/format/csv/CsvFormatWriter.java | 5 +
.../org/apache/paimon/format/csv/CsvOptions.java | 2 +-
.../paimon/format/csv/CsvFileFormatTest.java | 322 ++++++++++++++++++++-
6 files changed, 324 insertions(+), 14 deletions(-)
diff --git a/docs/content/concepts/spec/fileformat.md
b/docs/content/concepts/spec/fileformat.md
index 98798f7da0..d0873de348 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -414,7 +414,7 @@ Format Options:
</tr>
<tr>
<td><h5>csv.null-literal</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td style="word-wrap: break-word;"><code>""</code></td>
<td>String</td>
<td>Null literal string that is interpreted as a null value (disabled by
default).</td>
</tr>
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index b69e49187c..6dce1b470a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -86,6 +86,8 @@ public class CsvFileFormat extends FileFormat {
case FLOAT:
case DOUBLE:
case DATE:
+ case BINARY:
+ case VARBINARY:
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index fae32adc59..208ab73118 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -42,12 +42,14 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** CSV file reader implementation. */
public class CsvFileReader implements FileRecordReader<InternalRow> {
+ private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
private static final CsvMapper CSV_MAPPER = new CsvMapper();
// Performance optimization: Cache frequently used cast executors
@@ -203,6 +205,9 @@ public class CsvFileReader implements
FileRecordReader<InternalRow> {
case CHAR:
case VARCHAR:
return BinaryString.fromString(field);
+ case BINARY:
+ case VARBINARY:
+ return BASE64_DECODER.decode(field);
default:
return useCachedCastExecutor(field, dataType);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index a7ffb1a110..e6f5fd167d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -31,12 +31,14 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** CSV format writer implementation. */
public class CsvFormatWriter implements FormatWriter {
+ private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
// Performance optimization: Cache frequently used cast executors
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
new ConcurrentHashMap<>(32);
@@ -154,6 +156,9 @@ public class CsvFormatWriter implements FormatWriter {
case CHAR:
case VARCHAR:
return value.toString();
+ case BINARY:
+ case VARBINARY:
+ return BASE64_ENCODER.encodeToString((byte[]) value);
default:
return useCachedStringCastExecutor(value, dataType);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
index e661127a9a..6a212a41cc 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -59,7 +59,7 @@ public class CsvOptions {
public static final ConfigOption<String> NULL_LITERAL =
ConfigOptions.key("csv.null-literal")
.stringType()
- .defaultValue("null")
+ .defaultValue("")
.withDescription("The literal for null values in CSV
format");
private final String fieldDelimiter;
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index c9c5f3dbb5..bed689bd4d 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -18,14 +18,22 @@
package org.apache.paimon.format.csv;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FileFormatFactory.FormatContext;
import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -35,6 +43,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
@@ -55,18 +64,6 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
return new CsvFileFormatFactory().create(new FormatContext(new
Options(), 1024, 1024));
}
- @Test
- public void testWhenUseHiveDefaultDelimiter() throws IOException {
- Options options = new Options();
- options.set(CsvOptions.FIELD_DELIMITER, "\001");
- FileFormat format =
- new CsvFileFormatFactory().create(new FormatContext(new
Options(), 1024, 1024));
- testSimpleTypesUtil(
- format, new Path(new Path(parent.toUri()), UUID.randomUUID() +
"." + formatType));
- testFullTypesUtil(
- format, new Path(new Path(parent.toUri()), UUID.randomUUID() +
"." + formatType));
- }
-
@Test
public void testCsvParsingWithEmptyFields() throws IOException {
@@ -157,6 +154,274 @@ public class CsvFileFormatTest extends
FormatReadWriteTest {
assertThat(fields[2]).isEqualTo("field3");
}
+ @Test
+ public void testCsvFieldDelimiterWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.STRING(),
+ DataTypes.DOUBLE().notNull());
+
+ String[] delimiters = {",", ";", "|", "\t", "\001"};
+
+ // Create test data once (reused for all delimiters)
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"),
100.5),
+ GenericRow.of(2, BinaryString.fromString("Bob"),
200.75),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
300.25));
+
+ for (String delimiter : delimiters) {
+ Options options = new Options();
+ options.set(CsvOptions.FIELD_DELIMITER, delimiter);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options, rowType, testData, "test_field_delim_" +
delimiter.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(result.get(0).getDouble(2)).isEqualTo(100.5);
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(result.get(1).getDouble(2)).isEqualTo(200.75);
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(result.get(2).getDouble(2)).isEqualTo(300.25);
+ }
+ }
+
+ @Test
+ public void testCsvLineDelimiterWriteRead() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.STRING());
+
+ String[] delimiters = {"\n", "\r", "\r\n"};
+
+ // Create test data once (reused for all delimiters)
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("First")),
+ GenericRow.of(2, BinaryString.fromString("Second")),
+ GenericRow.of(3, BinaryString.fromString("Third")));
+
+ for (String delimiter : delimiters) {
+ Options options = new Options();
+ options.set(CsvOptions.LINE_DELIMITER, delimiter);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options, rowType, testData, "test_line_delim_" +
delimiter.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("First");
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+
assertThat(result.get(1).getString(1).toString()).isEqualTo("Second");
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Third");
+ }
+ }
+
+ @Test
+ public void testCsvQuoteCharacterWriteRead() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.STRING());
+
+ String[] quoteChars = {"\"", "'", "`"};
+
+ // Create test data with values that need quoting (contain
spaces/commas)
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Hello,
World")),
+ GenericRow.of(2, BinaryString.fromString("Test
Value")),
+ GenericRow.of(3, BinaryString.fromString("Another,
Test")));
+
+ for (String quoteChar : quoteChars) {
+ Options options = new Options();
+ options.set(CsvOptions.QUOTE_CHARACTER, quoteChar);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options, rowType, testData, "test_quote_char_" +
quoteChar.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Hello, World");
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getString(1).toString()).isEqualTo("Test
Value");
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Another, Test");
+ }
+ }
+
+ @Test
+ public void testCsvEscapeCharacterWriteRead() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.STRING());
+
+ String[] escapeChars = {"\\", "/"};
+
+ // Create test data with values that might need escaping
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1,
BinaryString.fromString("Value\"With\"Quotes")),
+ GenericRow.of(2, BinaryString.fromString("Normal
Value")),
+ GenericRow.of(3,
BinaryString.fromString("Special\\Characters")));
+
+ for (String escapeChar : escapeChars) {
+ Options options = new Options();
+ options.set(CsvOptions.ESCAPE_CHARACTER, escapeChar);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options,
+ rowType,
+ testData,
+ "test_escape_char_" + escapeChar.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+
assertThat(result.get(1).getString(1).toString()).isEqualTo("Normal Value");
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+ }
+ }
+
+ @Test
+ public void testCsvIncludeHeaderWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(),
DataTypes.BOOLEAN());
+
+ boolean[] includeHeaderOptions = {false, true};
+
+ // Create test data
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"),
true),
+ GenericRow.of(2, BinaryString.fromString("Bob"),
false),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
true));
+
+ for (boolean includeHeader : includeHeaderOptions) {
+ Options options = new Options();
+ options.set(CsvOptions.INCLUDE_HEADER, includeHeader);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options, rowType, testData, "test_include_header_"
+ includeHeader);
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(result.get(0).getBoolean(2)).isEqualTo(true);
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(result.get(1).getBoolean(2)).isEqualTo(false);
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(result.get(2).getBoolean(2)).isEqualTo(true);
+ }
+ }
+
+ @Test
+ public void testCsvNullLiteralWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(),
DataTypes.INT());
+
+ String[] nullLiterals = {"", "NULL", "null"};
+
+ // Create test data with null values
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"),
null),
+ GenericRow.of(2, null, 100),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
300));
+
+ for (String nullLiteral : nullLiterals) {
+ Options options = new Options();
+ options.set(CsvOptions.NULL_LITERAL, nullLiteral);
+
+ List<InternalRow> result =
+ writeThenRead(
+ options,
+ rowType,
+ testData,
+ "test_null_literal_" + nullLiteral.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(result.get(0).isNullAt(2)).isTrue();
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).isNullAt(1)).isTrue();
+ assertThat(result.get(1).getInt(2)).isEqualTo(100);
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(result.get(2).getInt(2)).isEqualTo(300);
+ }
+ }
+
+ @Test
+ public void testCsvOptionsCombinationWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.STRING(),
+ DataTypes.DOUBLE(),
+ DataTypes.BOOLEAN());
+
+ // Test multiple CSV options together
+ Options options = new Options();
+ options.set(CsvOptions.FIELD_DELIMITER, ";");
+ options.set(CsvOptions.LINE_DELIMITER, "\r\n");
+ options.set(CsvOptions.QUOTE_CHARACTER, "'");
+ options.set(CsvOptions.ESCAPE_CHARACTER, "/");
+ options.set(CsvOptions.INCLUDE_HEADER, true);
+ options.set(CsvOptions.NULL_LITERAL, "NULL");
+
+ // Create test data with various scenarios
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice;
Test"), 100.5, true),
+ GenericRow.of(2, null, 200.75, false),
+ GenericRow.of(3, BinaryString.fromString("Charlie's
Data"), null, true),
+ GenericRow.of(4, BinaryString.fromString("Normal"),
400.0, null));
+
+ List<InternalRow> result =
+ writeThenRead(options, rowType, testData,
"test_csv_combination");
+
+ // Verify results
+ assertThat(result).hasSize(4);
+
+ // Verify first row
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice;
Test");
+ assertThat(result.get(0).getDouble(2)).isEqualTo(100.5);
+ assertThat(result.get(0).getBoolean(3)).isEqualTo(true);
+
+ // Verify second row (with null string)
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).isNullAt(1)).isTrue();
+ assertThat(result.get(1).getDouble(2)).isEqualTo(200.75);
+ assertThat(result.get(1).getBoolean(3)).isEqualTo(false);
+
+ // Verify third row (with null double)
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+ assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie's
Data");
+ assertThat(result.get(2).isNullAt(2)).isTrue();
+ assertThat(result.get(2).getBoolean(3)).isEqualTo(true);
+
+ // Verify fourth row (with null boolean)
+ assertThat(result.get(3).getInt(0)).isEqualTo(4);
+ assertThat(result.get(3).getString(1).toString()).isEqualTo("Normal");
+ assertThat(result.get(3).getDouble(2)).isEqualTo(400.0);
+ assertThat(result.get(3).isNullAt(3)).isTrue();
+ }
+
@Override
protected RowType rowTypeForFullTypesTest() {
RowType.Builder builder =
@@ -168,6 +433,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
.field("tinyint", DataTypes.TINYINT())
.field("smallint", DataTypes.SMALLINT())
.field("bigint", DataTypes.BIGINT())
+ .field("bytes", DataTypes.BYTES())
.field("timestamp", DataTypes.TIMESTAMP())
.field("timestamp_3", DataTypes.TIMESTAMP(3))
.field("timestamp_ltz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
@@ -197,6 +463,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
(byte) 3,
(short) 6,
12304L,
+ new byte[] {1, 5, 2},
Timestamp.fromMicros(123123123),
Timestamp.fromEpochMillis(123123123),
Timestamp.fromMicros(123123123),
@@ -227,4 +494,35 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
.withNullValue("null");
return CsvFileReader.parseCsvLineToArray(csvLine, schema);
}
+
+ /**
+ * Performs a complete write-read test with the given options and test
data. Returns the data
+ * that was read back for further verification.
+ */
+ private List<InternalRow> writeThenRead(
+ Options options, RowType rowType, List<InternalRow> testData,
String testPrefix)
+ throws IOException {
+ FileFormat format =
+ new CsvFileFormatFactory().create(new FormatContext(options,
1024, 1024));
+ Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID()
+ ".csv");
+
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false);
+ FormatWriter writer = writerFactory.create(out, "none")) {
+ for (InternalRow row : testData) {
+ writer.addElement(row);
+ }
+ }
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType)
+ .createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile)))) {
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(rowType);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+ return result;
+ }
+ }
}