This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a266520ad6 [core] Introduce csv format (#6093)
a266520ad6 is described below
commit a266520ad61f250e8a76068207473644acfb3d3d
Author: jerry <[email protected]>
AuthorDate: Tue Aug 19 22:17:02 2025 +0800
[core] Introduce csv format (#6093)
---
.../generated/format_table_configuration.html | 36 ----
.../apache/paimon/format/SupportsDirectWrite.java | 2 +-
.../apache/paimon/format/FormatReadWriteTest.java | 130 +++++++++++-
.../apache/paimon/flink/FormatCatalogTable.java | 7 +-
.../apache/paimon/format/csv/CsvFileFormat.java | 115 +++++++++++
.../paimon/format/csv/CsvFileFormatFactory.java | 27 ++-
.../apache/paimon/format/csv/CsvFileReader.java | 228 ++++++++++++++++++++
.../apache/paimon/format/csv/CsvFormatWriter.java | 178 ++++++++++++++++
.../org/apache/paimon/format/csv/CsvOptions.java | 103 +++++++++
.../apache/paimon/format/csv/CsvReaderFactory.java | 25 ++-
.../org.apache.paimon.format.FileFormatFactory | 1 +
.../paimon/format/csv/CsvFileFormatTest.java | 230 +++++++++++++++++++++
.../java/org/apache/paimon/hive/HiveCatalog.java | 4 +-
.../org/apache/paimon/hive/HiveTableUtils.java | 3 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 4 +-
15 files changed, 1021 insertions(+), 72 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/format_table_configuration.html
b/docs/layouts/shortcodes/generated/format_table_configuration.html
deleted file mode 100644
index 71133d52d8..0000000000
--- a/docs/layouts/shortcodes/generated/format_table_configuration.html
+++ /dev/null
@@ -1,36 +0,0 @@
-{{/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/}}
-<table class="configuration table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 20%">Key</th>
- <th class="text-left" style="width: 15%">Default</th>
- <th class="text-left" style="width: 10%">Type</th>
- <th class="text-left" style="width: 55%">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><h5>field-delimiter</h5></td>
- <td style="word-wrap: break-word;">","</td>
- <td>String</td>
- <td>Optional field delimiter character for CSV (',' by
default).</td>
- </tr>
- </tbody>
-</table>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
index 76ac9a8ba1..6f513330f4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
@@ -23,7 +23,7 @@ import org.apache.paimon.fs.Path;
import java.io.IOException;
-/** Creaet a FormatWriter which has full control abort file io. */
+/** Create a FormatWriter which has full control abort file io. */
public interface SupportsDirectWrite {
FormatWriter create(FileIO fileIO, Path path, String compression) throws
IOException;
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index 4a7325f410..40dcd975fc 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format;
import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
@@ -33,6 +34,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
@@ -62,10 +64,11 @@ public abstract class FormatReadWriteTest {
@TempDir java.nio.file.Path tempPath;
- private final String formatType;
+ protected final String formatType;
protected FileIO fileIO;
protected Path file;
+ protected Path parent;
protected FormatReadWriteTest(String formatType) {
this.formatType = formatType;
@@ -74,6 +77,7 @@ public abstract class FormatReadWriteTest {
@BeforeEach
public void beforeEach() {
this.fileIO = LocalFileIO.create();
+ this.parent = new Path(tempPath.toUri());
this.file = new Path(new Path(tempPath.toUri()), UUID.randomUUID() +
"." + formatType);
}
@@ -81,6 +85,11 @@ public abstract class FormatReadWriteTest {
@Test
public void testSimpleTypes() throws IOException {
+ FileFormat format = fileFormat();
+ testSimpleTypesUtil(format, file);
+ }
+
+ protected void testSimpleTypesUtil(FileFormat format, Path file) throws
IOException {
RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.BIGINT());
if (ThreadLocalRandom.current().nextBoolean()) {
@@ -88,9 +97,8 @@ public abstract class FormatReadWriteTest {
}
InternalRowSerializer serializer = new InternalRowSerializer(rowType);
- FileFormat format = fileFormat();
FormatWriterFactory factory = format.createWriterFactory(rowType);
- write(factory, GenericRow.of(1, 1L), GenericRow.of(2, 2L),
GenericRow.of(3, null));
+ write(factory, file, GenericRow.of(1, 1L), GenericRow.of(2, 2L),
GenericRow.of(3, null));
RecordReader<InternalRow> reader =
format.createReaderFactory(rowType)
.createReader(
@@ -106,12 +114,16 @@ public abstract class FormatReadWriteTest {
@Test
public void testFullTypes() throws IOException {
+ FileFormat format = fileFormat();
+ testFullTypesUtil(format, file);
+ }
+
+ protected void testFullTypesUtil(FileFormat format, Path file) throws
IOException {
RowType rowType = rowTypeForFullTypesTest();
InternalRow expected = expectedRowForFullTypesTest();
- FileFormat format = fileFormat();
FormatWriterFactory factory = format.createWriterFactory(rowType);
- write(factory, expected);
+ write(factory, file, expected);
RecordReader<InternalRow> reader =
format.createReaderFactory(rowType)
.createReader(
@@ -124,8 +136,15 @@ public abstract class FormatReadWriteTest {
validateFullTypesResult(result.get(0), expected);
}
+ public boolean supportNestedReadPruning() {
+ return true;
+ }
+
@Test
public void testNestedReadPruning() throws Exception {
+ if (!supportNestedReadPruning()) {
+ return;
+ }
FileFormat format = fileFormat();
RowType writeType =
@@ -140,7 +159,7 @@ public abstract class FormatReadWriteTest {
DataTypes.FIELD(4, "f2",
DataTypes.INT()))));
FormatWriterFactory factory = format.createWriterFactory(writeType);
- write(factory, GenericRow.of(0, GenericRow.of(10, 11, 12)));
+ write(factory, file, GenericRow.of(0, GenericRow.of(10, 11, 12)));
// skip read f0, f1.f1
RowType readType =
@@ -176,7 +195,10 @@ public abstract class FormatReadWriteTest {
RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v",
DataTypes.VARIANT()));
FormatWriterFactory factory = format.createWriterFactory(writeType);
- write(factory,
GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}")));
+ write(
+ factory,
+ file,
+
GenericRow.of(GenericVariant.fromJson("{\"age\":35,\"city\":\"Chicago\"}")));
List<InternalRow> result = new ArrayList<>();
try (RecordReader<InternalRow> reader =
format.createReaderFactory(writeType)
@@ -201,6 +223,7 @@ public abstract class FormatReadWriteTest {
RowType writeType = DataTypes.ROW(new ArrayType(true, new
VariantType()));
write(
format.createWriterFactory(writeType),
+ file,
GenericRow.of(
new GenericArray(
new Object[] {
@@ -222,7 +245,8 @@ public abstract class FormatReadWriteTest {
assertThat(array.getVariant(1).toJson()).isEqualTo("{\"age\":45,\"city\":\"Beijing\"}");
}
- private void write(FormatWriterFactory factory, InternalRow... rows)
throws IOException {
+ protected void write(FormatWriterFactory factory, Path file,
InternalRow... rows)
+ throws IOException {
FormatWriter writer;
PositionOutputStream out = null;
if (factory instanceof SupportsDirectWrite) {
@@ -332,6 +356,94 @@ public abstract class FormatReadWriteTest {
return GenericRow.of(values.toArray());
}
+ public boolean supportDataFileWithoutExtension() {
+ return false;
+ }
+
+ @Test
+ public void testWriteAndReadFileWithoutExtension() throws IOException {
+ if (!supportDataFileWithoutExtension()) {
+ return;
+ }
+ RowType rowType =
+ RowType.of(DataTypes.INT().notNull(), DataTypes.STRING(),
DataTypes.BOOLEAN());
+
+ // Create test data
+ List<InternalRow> testData = new ArrayList<>();
+ testData.add(GenericRow.of(1, BinaryString.fromString("Alice"), true));
+ testData.add(GenericRow.of(2, BinaryString.fromString("Bob"), false));
+ testData.add(GenericRow.of(3, BinaryString.fromString("Charlie"),
true));
+
+ // Create file format
+ FileFormat jsonFormat = fileFormat();
+
+ // Write data
+ Path filePath = new Path(parent, UUID.randomUUID().toString());
+ FormatWriterFactory writerFactory =
jsonFormat.createWriterFactory(rowType);
+ try (FormatWriter writer =
+ writerFactory.create(fileIO.newOutputStream(filePath, false),
"none")) {
+ for (InternalRow row : testData) {
+ writer.addElement(row);
+ }
+ }
+
+ // Read data
+ FormatReaderFactory readerFactory =
jsonFormat.createReaderFactory(rowType, null);
+ FileRecordReader<InternalRow> reader =
+ readerFactory.createReader(
+ new FormatReaderFactory.Context() {
+ @Override
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
+
+ @Override
+ public long fileSize() {
+ try {
+ return fileIO.getFileSize(filePath);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public org.apache.paimon.utils.RoaringBitmap32
selection() {
+ return null;
+ }
+ });
+
+ List<InternalRow> readData = new ArrayList<>();
+ RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
+ while (iterator != null) {
+ InternalRow row;
+ while ((row = iterator.next()) != null) {
+ readData.add(GenericRow.of(row.getInt(0), row.getString(1),
row.getBoolean(2)));
+ }
+ iterator.releaseBatch();
+ iterator = reader.readBatch();
+ }
+ reader.close();
+
+ // Verify data
+ assertThat(readData).hasSize(3);
+ assertThat(readData.get(0).getInt(0)).isEqualTo(1);
+ assertThat(readData.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(readData.get(0).getBoolean(2)).isTrue();
+
+ assertThat(readData.get(1).getInt(0)).isEqualTo(2);
+ assertThat(readData.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(readData.get(1).getBoolean(2)).isFalse();
+
+ assertThat(readData.get(2).getInt(0)).isEqualTo(3);
+
assertThat(readData.get(2).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(readData.get(2).getBoolean(2)).isTrue();
+ }
+
private DataType getMapValueType() {
if (formatType.equals("avro") || formatType.equals("orc")) {
return DataTypes.ROW(
@@ -351,7 +463,7 @@ public abstract class FormatReadWriteTest {
}
}
- private void validateFullTypesResult(InternalRow actual, InternalRow
expected) {
+ protected void validateFullTypesResult(InternalRow actual, InternalRow
expected) {
RowType rowType = rowTypeForFullTypesTest();
InternalRow.FieldGetter[] fieldGetters =
IntStream.range(0, rowType.getFieldCount())
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index cb69cce258..3d1b4d1c6b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink;
+import org.apache.paimon.format.csv.CsvOptions;
import org.apache.paimon.table.FormatTable;
import org.apache.flink.table.api.Schema;
@@ -39,7 +40,6 @@ import static
org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
-import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
/** A {@link CatalogTable} to represent format table. */
public class FormatCatalogTable implements CatalogTable {
@@ -99,8 +99,9 @@ public class FormatCatalogTable implements CatalogTable {
cachedOptions.put(k, v);
}
});
- if (options.containsKey(FIELD_DELIMITER.key())) {
- cachedOptions.put("csv.field-delimiter",
options.get(FIELD_DELIMITER.key()));
+ if (options.containsKey(CsvOptions.FIELD_DELIMITER.key())) {
+ cachedOptions.put(
+ "csv.field-delimiter",
options.get(CsvOptions.FIELD_DELIMITER.key()));
}
cachedOptions.put(CONNECTOR.key(), "filesystem");
cachedOptions.put(PATH.key(), table.location());
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
new file mode 100644
index 0000000000..04e003c5b3
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** CSV {@link FileFormat}. */
+public class CsvFileFormat extends FileFormat {
+
+ public static final String CSV_IDENTIFIER = "csv";
+
+ private final Options options;
+
+ public CsvFileFormat(FormatContext context) {
+ this(context, CSV_IDENTIFIER);
+ }
+
+ public CsvFileFormat(FormatContext context, String identifier) {
+ super(identifier);
+ this.options = context.options();
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType projectedRowType, @Nullable List<Predicate> filters) {
+ return new CsvReaderFactory(projectedRowType, new CsvOptions(options));
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ return new CsvWriterFactory(type, new CsvOptions(options));
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ List<DataType> fieldTypes = rowType.getFieldTypes();
+ for (DataType dataType : fieldTypes) {
+ validateDataType(dataType);
+ }
+ }
+
+ private void validateDataType(DataType dataType) {
+ // CSV format supports primitive types and string representation of
complex types
+ DataTypeRoot typeRoot = dataType.getTypeRoot();
+ switch (typeRoot) {
+ case CHAR:
+ case VARCHAR:
+ case BOOLEAN:
+ case DECIMAL:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ // These are directly supported
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported data type for CSV format: " + dataType);
+ }
+ }
+
+ /** A {@link FormatWriterFactory} to write {@link InternalRow} to CSV. */
+ private static class CsvWriterFactory implements FormatWriterFactory {
+
+ private final RowType rowType;
+ private final CsvOptions options;
+
+ public CsvWriterFactory(RowType rowType, CsvOptions options) {
+ this.rowType = rowType;
+ this.options = options;
+ }
+
+ @Override
+ public FormatWriter create(PositionOutputStream out, String
compression) {
+ return new CsvFormatWriter(out, rowType, options);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java
similarity index 61%
rename from
paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
rename to
paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java
index b4010209c3..d565b831cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormatFactory.java
@@ -16,18 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.table;
+package org.apache.paimon.format.csv;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
-/** Options of {@link FormatTable}. */
-public class FormatTableOptions {
+/** Factory to create {@link CsvFileFormat}. */
+public class CsvFileFormatFactory implements FileFormatFactory {
- public static final ConfigOption<String> FIELD_DELIMITER =
- ConfigOptions.key("field-delimiter")
- .stringType()
- .defaultValue(",")
- .withDescription(
- "Optional field delimiter character for CSV (','
by default).");
+ public static final String IDENTIFIER = "csv";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new CsvFileFormat(formatContext);
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
new file mode 100644
index 0000000000..80eb34de16
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** CSV file reader implementation. */
+public class CsvFileReader implements FileRecordReader<InternalRow> {
+
+ private static final CsvMapper CSV_MAPPER = new CsvMapper();
+ private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
+
+ // Performance optimization: Cache frequently used cast executors
+ private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
+ new ConcurrentHashMap<>(32);
+
+ private final RowType rowType;
+ private final CsvOptions options;
+ private final Path filePath;
+ private final CsvSchema schema;
+
+ private BufferedReader bufferedReader;
+ private boolean headerSkipped = false;
+ private boolean readerClosed = false;
+ private CsvRecordIterator reader;
+
+ public CsvFileReader(FormatReaderFactory.Context context, RowType rowType,
CsvOptions options)
+ throws IOException {
+ this.rowType = rowType;
+ this.filePath = context.filePath();
+ this.options = options;
+ this.schema =
+ CsvSchema.emptySchema()
+ .withQuoteChar(options.quoteCharacter().charAt(0))
+
.withColumnSeparator(options.fieldDelimiter().charAt(0))
+ .withEscapeChar(options.escapeCharacter().charAt(0));
+ if (!options.includeHeader()) {
+ this.schema.withoutHeader();
+ }
+ FileIO fileIO = context.fileIO();
+ SeekableInputStream inputStream =
fileIO.newInputStream(context.filePath());
+ reader = new CsvRecordIterator();
+ InputStreamReader inputStreamReader =
+ new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+ this.bufferedReader = new BufferedReader(inputStreamReader);
+ }
+
+ @Override
+ @Nullable
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ if (readerClosed) {
+ return null;
+ }
+
+ // Skip header if needed
+ if (options.includeHeader() && !headerSkipped) {
+ bufferedReader.readLine();
+ headerSkipped = true;
+ }
+ if (reader.end) {
+ return null;
+ }
+ return reader;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!readerClosed && bufferedReader != null) {
+ bufferedReader.close();
+ readerClosed = true;
+ }
+ }
+
+ private class CsvRecordIterator implements FileRecordIterator<InternalRow>
{
+ private boolean batchRead = false;
+ private long currentPosition = 0;
+ private String nextLine = null;
+ boolean end = false;
+
+ @Override
+ @Nullable
+ public InternalRow next() throws IOException {
+ if (batchRead || readerClosed) {
+ return null;
+ }
+ nextLine = bufferedReader.readLine();
+ if (nextLine == null) {
+ batchRead = true;
+ end = true;
+ return null;
+ }
+
+ currentPosition++;
+ return parseCsvLine(nextLine, schema);
+ }
+
+ @Override
+ public void releaseBatch() {
+ // No resources to release for CSV
+ }
+
+ @Override
+ public long returnedPosition() {
+ return currentPosition - 1; // Return position of last returned row
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
+ }
+
+ protected static String[] parseCsvLineToArray(String line, CsvSchema
schema)
+ throws IOException {
+ if (line == null || line.isEmpty()) {
+ return new String[] {};
+ }
+ return
CSV_MAPPER.readerFor(String[].class).with(schema).readValue(line);
+ }
+
+ private InternalRow parseCsvLine(String line, CsvSchema schema) throws
IOException {
+ String[] fields = parseCsvLineToArray(line, schema);
+ int fieldCount = Math.min(fields.length, rowType.getFieldCount());
+ Object[] values = new Object[fieldCount]; // Pre-allocated array
+
+ for (int i = 0; i < fieldCount; i++) {
+ String field = fields[i];
+
+ // Fast path for null values
+ if (field == null || field.equals(options.nullLiteral()) ||
field.isEmpty()) {
+ values[i] = null;
+ continue;
+ }
+
+ // Optimized field parsing with cached cast executors
+ values[i] = parseFieldOptimized(field.trim(),
rowType.getTypeAt(i));
+ }
+
+ return GenericRow.of(values);
+ }
+
+ /** Optimized field parsing with caching and fast paths for common types.
*/
+ private Object parseFieldOptimized(String field, DataType dataType) {
+ if (field == null || field.equals(options.nullLiteral())) {
+ return null;
+ }
+
+ DataTypeRoot typeRoot = dataType.getTypeRoot();
+ switch (typeRoot) {
+ case TINYINT:
+ return Byte.parseByte(field);
+ case SMALLINT:
+ return Short.parseShort(field);
+ case INTEGER:
+ return Integer.parseInt(field);
+ case BIGINT:
+ return Long.parseLong(field);
+ case FLOAT:
+ return Float.parseFloat(field);
+ case DOUBLE:
+ return Double.parseDouble(field);
+ case BOOLEAN:
+ return Boolean.parseBoolean(field);
+ case CHAR:
+ case VARCHAR:
+ return BinaryString.fromString(field);
+ default:
+ return useCachedCastExecutor(field, dataType);
+ }
+ }
+
+ private Object useCachedCastExecutor(String field, DataType dataType) {
+ String cacheKey = dataType.toString();
+ @SuppressWarnings("unchecked")
+ CastExecutor<BinaryString, Object> cast =
+ (CastExecutor<BinaryString, Object>)
+ CAST_EXECUTOR_CACHE.computeIfAbsent(
+ cacheKey, k ->
CastExecutors.resolve(DataTypes.STRING(), dataType));
+
+ if (cast != null) {
+ return cast.cast(BinaryString.fromString(field));
+ }
+ return BinaryString.fromString(field);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
new file mode 100644
index 0000000000..5012696dea
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** CSV format writer implementation. */
+public class CsvFormatWriter implements FormatWriter {
+
+ // Performance optimization: Cache frequently used cast executors
+ private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
+ new ConcurrentHashMap<>(32);
+
+ private final RowType rowType;
+ private final CsvOptions options;
+
+ private final BufferedWriter writer;
+ private final PositionOutputStream outputStream;
+ private boolean headerWritten = false;
+
+ private final StringBuilder stringBuilder;
+
+ public CsvFormatWriter(PositionOutputStream out, RowType rowType,
CsvOptions options) {
+ this.rowType = rowType;
+ this.options = options;
+ this.outputStream = out;
+ OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
+ this.writer = new BufferedWriter(outputStreamWriter);
+ this.stringBuilder = new StringBuilder();
+ }
+
+ @Override
+ public void addElement(InternalRow element) throws IOException {
+ // Write header if needed
+ if (options.includeHeader() && !headerWritten) {
+ writeHeader();
+ headerWritten = true;
+ }
+
+ // Reuse StringBuilder for better performance
+ stringBuilder.setLength(0); // Reset without reallocating
+
+ int fieldCount = rowType.getFieldCount();
+ for (int i = 0; i < fieldCount; i++) {
+ if (i > 0) {
+ stringBuilder.append(options.fieldDelimiter());
+ }
+
+ Object value =
+ InternalRow.createFieldGetter(rowType.getTypeAt(i),
i).getFieldOrNull(element);
+ String fieldValue = escapeField(castToStringOptimized(value,
rowType.getTypeAt(i)));
+ stringBuilder.append(fieldValue);
+ }
+ stringBuilder.append(options.lineDelimiter());
+
+ writer.write(stringBuilder.toString());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+ }
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
+ if (outputStream != null && suggestedCheck) {
+ return outputStream.getPos() >= targetSize;
+ }
+ return false;
+ }
+
+ private void writeHeader() throws IOException {
+ stringBuilder.setLength(0); // Reuse StringBuilder
+
+ int fieldCount = rowType.getFieldCount();
+ for (int i = 0; i < fieldCount; i++) {
+ if (i > 0) {
+ stringBuilder.append(options.fieldDelimiter());
+ }
+ stringBuilder.append(escapeField(rowType.getFieldNames().get(i)));
+ }
+ stringBuilder.append(options.lineDelimiter());
+ writer.write(stringBuilder.toString());
+ }
+
+ private String escapeField(String field) {
+ if (field == null) {
+ return options.nullLiteral();
+ }
+
+ // Optimized escaping with early exit checks
+ boolean needsQuoting =
+ field.indexOf(options.fieldDelimiter().charAt(0)) >= 0
+ || field.indexOf(options.lineDelimiter().charAt(0)) >= 0
+ || field.indexOf(options.quoteCharacter().charAt(0))
>= 0;
+
+ if (!needsQuoting) {
+ return field;
+ }
+
+ // Only escape if needed
+ String escaped =
+ field.replace(
+ options.quoteCharacter(),
+ options.escapeCharacter() + options.quoteCharacter());
+ return options.quoteCharacter() + escaped + options.quoteCharacter();
+ }
+
+ /** Optimized string casting with caching and fast paths for common types.
*/
+ private String castToStringOptimized(Object value, DataType dataType) {
+ if (value == null) {
+ return null;
+ }
+
+ DataTypeRoot typeRoot = dataType.getTypeRoot();
+ switch (typeRoot) {
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case CHAR:
+ case VARCHAR:
+ return value.toString();
+ default:
+ return useCachedStringCastExecutor(value, dataType);
+ }
+ }
+
+ private String useCachedStringCastExecutor(Object value, DataType
dataType) {
+ String cacheKey = dataType.toString();
+ CastExecutor<Object, ?> cast =
+ (CastExecutor<Object, ?>)
+ CAST_EXECUTOR_CACHE.computeIfAbsent(
+ cacheKey, k ->
CastExecutors.resolveToString(dataType));
+
+ if (cast != null) {
+ Object result = cast.cast(value);
+ return result != null ? result.toString() : null;
+ }
+ return value.toString();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
new file mode 100644
index 0000000000..956f530b3b
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+
+/** Options for csv format. */
+public class CsvOptions {
+
+ public static final ConfigOption<String> FIELD_DELIMITER =
+ ConfigOptions.key("field-delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("The field delimiter for CSV or TXT
format");
+
+ public static final ConfigOption<String> LINE_DELIMITER =
+ ConfigOptions.key("line-delimiter")
+ .stringType()
+ .defaultValue("\n")
+ .withDescription("The line delimiter for CSV format");
+
+ public static final ConfigOption<String> QUOTE_CHARACTER =
+ ConfigOptions.key("quote-character")
+ .stringType()
+ .defaultValue("\"")
+ .withDescription("The quote character for CSV format");
+
+ public static final ConfigOption<String> ESCAPE_CHARACTER =
+ ConfigOptions.key("escape-character")
+ .stringType()
+ .defaultValue("\\")
+ .withDescription("The escape character for CSV format");
+
+ public static final ConfigOption<Boolean> INCLUDE_HEADER =
+ ConfigOptions.key("include-header")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to include header in CSV files");
+
+ public static final ConfigOption<String> NULL_LITERAL =
+ ConfigOptions.key("null-literal")
+ .stringType()
+ .defaultValue("null")
+ .withDescription("The literal for null values in CSV
format");
+
+ private final String fieldDelimiter;
+ private final String lineDelimiter;
+ private final String nullLiteral;
+ private final boolean includeHeader;
+ private final String quoteCharacter;
+ private final String escapeCharacter;
+
+ public CsvOptions(Options options) {
+ this.fieldDelimiter = options.get(FIELD_DELIMITER);
+ this.lineDelimiter = options.get(LINE_DELIMITER);
+ this.nullLiteral = options.get(NULL_LITERAL);
+ this.includeHeader = options.get(INCLUDE_HEADER);
+ this.quoteCharacter = options.get(QUOTE_CHARACTER);
+ this.escapeCharacter = options.get(ESCAPE_CHARACTER);
+ }
+
+ public String fieldDelimiter() {
+ return fieldDelimiter;
+ }
+
+ public String lineDelimiter() {
+ return lineDelimiter;
+ }
+
+ public String nullLiteral() {
+ return nullLiteral;
+ }
+
+ public boolean includeHeader() {
+ return includeHeader;
+ }
+
+ public String quoteCharacter() {
+ return quoteCharacter;
+ }
+
+ public String escapeCharacter() {
+ return escapeCharacter;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
similarity index 53%
copy from
paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
copy to
paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
index 76ac9a8ba1..cfc67f576c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvReaderFactory.java
@@ -16,15 +16,28 @@
* limitations under the License.
*/
-package org.apache.paimon.format;
+package org.apache.paimon.format.csv;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
import java.io.IOException;
-/** Creaet a FormatWriter which has full control abort file io. */
-public interface SupportsDirectWrite {
+/** CSV {@link FormatReaderFactory} implementation. */
+public class CsvReaderFactory implements FormatReaderFactory {
- FormatWriter create(FileIO fileIO, Path path, String compression) throws
IOException;
+ private final RowType rowType;
+ private final CsvOptions options;
+
+ public CsvReaderFactory(RowType rowType, CsvOptions options) {
+ this.rowType = rowType;
+ this.options = options;
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context) throws
IOException {
+ return new CsvFileReader(context, rowType, options);
+ }
}
diff --git
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index 7af6f79b34..c35f5544ca 100644
---
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -16,3 +16,4 @@
org.apache.paimon.format.avro.AvroFileFormatFactory
org.apache.paimon.format.orc.OrcFileFormatFactory
org.apache.paimon.format.parquet.ParquetFileFormatFactory
+org.apache.paimon.format.csv.CsvFileFormatFactory
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
new file mode 100644
index 0000000000..c9c5f3dbb5
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.csv;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CsvFileFormat}. */
+public class CsvFileFormatTest extends FormatReadWriteTest {
+
+ protected CsvFileFormatTest() {
+ super("csv");
+ }
+
+ @Override
+ protected FileFormat fileFormat() {
+ return new CsvFileFormatFactory().create(new FormatContext(new
Options(), 1024, 1024));
+ }
+
+ @Test
+ public void testWhenUseHiveDefaultDelimiter() throws IOException {
+ Options options = new Options();
+ options.set(CsvOptions.FIELD_DELIMITER, "\001");
+ FileFormat format =
+ new CsvFileFormatFactory().create(new FormatContext(new
Options(), 1024, 1024));
+ testSimpleTypesUtil(
+ format, new Path(new Path(parent.toUri()), UUID.randomUUID() +
"." + formatType));
+ testFullTypesUtil(
+ format, new Path(new Path(parent.toUri()), UUID.randomUUID() +
"." + formatType));
+ }
+
+ @Test
+ public void testCsvParsingWithEmptyFields() throws IOException {
+
+ // First row: ,25,"Software Engineer" (empty first field)
+ String csvLine = ",25,\"Software Engineer\"\n";
+ String[] fields = parse(csvLine);
+ assertThat(fields).isNotNull();
+ assertThat(fields[0] == null); // empty field becomes null
+ assertThat(fields[1]).isEqualTo("25");
+ assertThat(fields[2]).isEqualTo("Software Engineer");
+
+ // Second row: "John Doe",,"Developer" (empty middle field)
+ csvLine = "\"John Doe\",,\"Developer\"\n";
+ fields = parse(csvLine);
+ assertThat(fields).isNotNull();
+ assertThat(fields[0]).isEqualTo("John Doe");
+ assertThat(fields[1] == null); // empty field becomes null
+ assertThat(fields[2]).isEqualTo("Developer");
+
+ // Third row: "Jane Smith",30, (empty last field)
+ csvLine = "\"Jane Smith\",30,\n";
+ fields = parse(csvLine);
+ assertThat(fields).isNotNull();
+ assertThat(fields[0]).isEqualTo("Jane Smith");
+ assertThat(fields[1]).isEqualTo("30");
+ assertThat(fields[2] == null); // empty field becomes null
+ }
+
+ @Test
+ public void testJsonArrayQuotePreservation() throws Exception {
+ // Test that JSON arrays preserve quotes
+ String csvLine = "name,\"[1,2,3]\",age";
+ String[] fields = parse(csvLine);
+
+ assertThat(fields).hasSize(3);
+ assertThat(fields[0]).isEqualTo("name");
+ assertThat(fields[1]).isEqualTo("[1,2,3]"); // Quotes should be
preserved
+ assertThat(fields[2]).isEqualTo("age");
+ }
+
+ @Test
+ public void testJsonObjectQuotePreservation() throws Exception {
+ // Test that JSON objects preserve quotes
+ String csvLine = "id,{\"key\":\"value\"},status";
+ String[] fields = parse(csvLine);
+
+ assertThat(fields).hasSize(3);
+ assertThat(fields[0]).isEqualTo("id");
+ assertThat(fields[1]).isEqualTo("{\"key\":\"value\"}"); // Quotes
should be preserved
+ assertThat(fields[2]).isEqualTo("status");
+ }
+
+ @Test
+ public void testComplexJsonArrayQuotePreservation() throws Exception {
+ // Test complex JSON array with nested objects
+ String csvLine =
+
"field1,\"[{\"\"name\"\":\"\"John\"\"},{\"\"name\"\":\"\"Jane\"\"}]\",field3";
+ String[] fields = parse(csvLine);
+
+ assertThat(fields).hasSize(3);
+ assertThat(fields[0]).isEqualTo("field1");
+
assertThat(fields[1]).isEqualTo("[{\"name\":\"John\"},{\"name\":\"Jane\"}]");
+ assertThat(fields[2]).isEqualTo("field3");
+ }
+
+ @Test
+ public void testRegularQuotedFieldsRemoveQuotes() throws Exception {
+ // Test that regular quoted fields (not JSON) still remove quotes
+ String csvLine = "\"John,Doe\",\"25\",\"Engineer\"";
+ String[] fields = parse(csvLine);
+
+ assertThat(fields).hasSize(3);
+ assertThat(fields[0]).isEqualTo("John,Doe"); // Quotes removed for
regular field
+ assertThat(fields[1]).isEqualTo("25"); // Quotes removed
+ assertThat(fields[2]).isEqualTo("Engineer"); // Quotes removed
+ }
+
+ @Test
+ public void testJsonWithWhitespace() throws Exception {
+ // Test JSON with leading whitespace after quote
+ String csvLine = "field1,\" [1,2,3]\",field3";
+ String[] fields = parse(csvLine);
+
+ assertThat(fields).hasSize(3);
+ assertThat(fields[0]).isEqualTo("field1");
+ assertThat(fields[1])
+ .isEqualTo(" [1,2,3]"); // Should preserve quotes due to [
after whitespace
+ assertThat(fields[2]).isEqualTo("field3");
+ }
+
+ @Override
+ protected RowType rowTypeForFullTypesTest() {
+ RowType.Builder builder =
+ RowType.builder()
+ .field("id", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING()) /* optional by
default */
+ .field("salary", DataTypes.DOUBLE().notNull())
+ .field("boolean", DataTypes.BOOLEAN().nullable())
+ .field("tinyint", DataTypes.TINYINT())
+ .field("smallint", DataTypes.SMALLINT())
+ .field("bigint", DataTypes.BIGINT())
+ .field("timestamp", DataTypes.TIMESTAMP())
+ .field("timestamp_3", DataTypes.TIMESTAMP(3))
+ .field("timestamp_ltz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .field("timestamp_ltz_3",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+ .field("date", DataTypes.DATE())
+ .field("decimal", DataTypes.DECIMAL(2, 2))
+ .field("decimal2", DataTypes.DECIMAL(38, 2))
+ .field("decimal3", DataTypes.DECIMAL(10, 1));
+
+ RowType rowType = builder.build();
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ rowType = (RowType) rowType.notNull();
+ }
+
+ return rowType;
+ }
+
+ @Override
+ protected GenericRow expectedRowForFullTypesTest() {
+ List<Object> values =
+ Arrays.asList(
+ 1,
+ fromString("name"),
+ 5.26D,
+ true,
+ (byte) 3,
+ (short) 6,
+ 12304L,
+ Timestamp.fromMicros(123123123),
+ Timestamp.fromEpochMillis(123123123),
+ Timestamp.fromMicros(123123123),
+ Timestamp.fromEpochMillis(123123123),
+ 2456,
+ Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2),
+ Decimal.fromBigDecimal(new BigDecimal("12312455.22"),
38, 2),
+ Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10,
1));
+ return GenericRow.of(values.toArray());
+ }
+
+ @Override
+ public boolean supportNestedReadPruning() {
+ return false;
+ }
+
+ @Override
+ public boolean supportDataFileWithoutExtension() {
+ return true;
+ }
+
+ private String[] parse(String csvLine) throws IOException {
+ CsvSchema schema =
+ CsvSchema.emptySchema()
+ .withQuoteChar('\"')
+ .withColumnSeparator(',')
+ .withoutHeader()
+ .withNullValue("null");
+ return CsvFileReader.parseCsvLineToArray(csvLine, schema);
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 199714d2fa..f78780b629 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -124,7 +124,6 @@ import static
org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
@@ -1488,10 +1487,11 @@ public class HiveCatalog extends AbstractCatalog {
@Nullable FormatTable.Format provider, Map<String, String>
tableParameters) {
Map<String, String> param = new HashMap<>();
if (provider == FormatTable.Format.CSV) {
+ String delimiterKey = "field-delimiter";
param.put(
FIELD_DELIM,
tableParameters.getOrDefault(
- FIELD_DELIMITER.key(),
options.get(FIELD_DELIMITER)));
+ delimiterKey, options.getString(delimiterKey,
",")));
}
return param;
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index fdf520689b..cf04b32d90 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -39,7 +39,6 @@ import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
import static org.apache.paimon.hive.HiveCatalog.HIVE_FIELD_DELIM_DEFAULT;
import static org.apache.paimon.hive.HiveCatalog.isView;
-import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
class HiveTableUtils {
@@ -78,7 +77,7 @@ class HiveTableUtils {
} else {
format = Format.CSV;
options.set(
- FIELD_DELIMITER,
+ "field-delimiter",
serdeInfo
.getParameters()
.getOrDefault(FIELD_DELIM,
HIVE_FIELD_DELIM_DEFAULT));
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 53dc85eb44..c172f4d60c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.format.csv.CsvOptions;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionDefinition;
import org.apache.paimon.options.Options;
@@ -39,7 +40,6 @@ import
org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry;
import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
-import org.apache.paimon.table.FormatTableOptions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.ExceptionUtils;
@@ -530,7 +530,7 @@ public class SparkCatalog extends SparkBaseCatalog
Options options = Options.fromMap(formatTable.options());
CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
if (formatTable.format() == FormatTable.Format.CSV) {
- options.set("sep",
options.get(FormatTableOptions.FIELD_DELIMITER));
+ options.set("sep", options.get(CsvOptions.FIELD_DELIMITER));
dsOptions = new CaseInsensitiveStringMap(options.toMap());
return new PartitionedCSVTable(
ident.name(),