This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6b27977b43 [format] support json format (#6096)
6b27977b43 is described below
commit 6b27977b43e041ed0c3d0404e55bcead44e36257
Author: jerry <[email protected]>
AuthorDate: Thu Aug 21 10:13:53 2025 +0800
[format] support json format (#6096)
---
docs/content/concepts/spec/fileformat.md | 132 ++++++-
.../apache/paimon/format/BaseTextFileReader.java | 134 +++++++
.../apache/paimon/format/BaseTextFileWriter.java | 64 ++++
.../apache/paimon/format/csv/CsvFileReader.java | 92 +----
.../apache/paimon/format/csv/CsvFormatWriter.java | 30 +-
.../apache/paimon/format/json/JsonFileFormat.java | 117 ++++++
.../paimon/format/json/JsonFileFormatFactory.java | 38 ++
.../apache/paimon/format/json/JsonFileReader.java | 282 ++++++++++++++
.../paimon/format/json/JsonFormatWriter.java | 159 ++++++++
.../org/apache/paimon/format/json/JsonOptions.java | 113 ++++++
.../paimon/format/json/JsonReaderFactory.java | 48 +++
.../org.apache.paimon.format.FileFormatFactory | 1 +
.../paimon/format/json/JsonFileFormatTest.java | 421 +++++++++++++++++++++
13 files changed, 1527 insertions(+), 104 deletions(-)
diff --git a/docs/content/concepts/spec/fileformat.md
b/docs/content/concepts/spec/fileformat.md
index d0873de348..6599e258ab 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -496,4 +496,134 @@ The following table lists the type mapping from Paimon
type to CSV type.
Experimental feature, not recommended for production.
-TODO
+Format Options:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>json.ignore-parse-errors</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to ignore parse errors for JSON format. Skip fields and rows
with parse errors instead of failing. Fields are set to null in case of
errors.</td>
+ </tr>
+ <tr>
+ <td><h5>json.map-null-key-mode</h5></td>
+ <td style="word-wrap: break-word;"><code>FAIL</code></td>
+ <td>String</td>
+ <td>How to handle map keys that are null. Currently supported values are
<code>'FAIL'</code>, <code>'DROP'</code> and <code>'LITERAL'</code>:
+ <ul>
+ <li>Option <code>'FAIL'</code> will throw exception when encountering
map with null key.</li>
+ <li>Option <code>'DROP'</code> will drop null key entries for map.</li>
+ <li>Option <code>'LITERAL'</code> will replace null key with string
literal. The string literal is defined by
<code>json.map-null-key-literal</code> option.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>json.map-null-key-literal</h5></td>
+ <td style="word-wrap: break-word;"><code>null</code></td>
+ <td>String</td>
+ <td>Literal to use for null map keys when
<code>json.map-null-key-mode</code> is LITERAL.</td>
+ </tr>
+ <tr>
+ <td><h5>json.line-delimiter</h5></td>
+ <td style="word-wrap: break-word;"><code>\n</code></td>
+ <td>String</td>
+ <td>The line delimiter for JSON format.</td>
+ </tr>
+ </tbody>
+</table>
+
+Paimon JSON format uses [jackson databind
API](https://github.com/FasterXML/jackson-databind) to parse and generate JSON
string.
+
+The following table lists the type mapping from Paimon type to JSON type.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Paimon type</th>
+ <th class="text-left">JSON type</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>CHAR / VARCHAR / STRING</code></td>
+ <td><code>string</code></td>
+ </tr>
+ <tr>
+ <td><code>BOOLEAN</code></td>
+ <td><code>boolean</code></td>
+ </tr>
+ <tr>
+ <td><code>BINARY / VARBINARY</code></td>
+ <td><code>string with encoding: base64</code></td>
+ </tr>
+ <tr>
+ <td><code>DECIMAL</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>TINYINT</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>SMALLINT</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>INT</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>BIGINT</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>FLOAT</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>DOUBLE</code></td>
+ <td><code>number</code></td>
+ </tr>
+ <tr>
+ <td><code>DATE</code></td>
+ <td><code>string with format: date</code></td>
+ </tr>
+ <tr>
+ <td><code>TIME</code></td>
+ <td><code>string with format: time</code></td>
+ </tr>
+ <tr>
+ <td><code>TIMESTAMP</code></td>
+ <td><code>string with format: date-time</code></td>
+ </tr>
+ <tr>
+ <td><code>TIMESTAMP_LOCAL_ZONE</code></td>
+ <td><code>string with format: date-time (with UTC time zone)</code></td>
+ </tr>
+ <tr>
+ <td><code>ARRAY</code></td>
+ <td><code>array</code></td>
+ </tr>
+ <tr>
+ <td><code>MAP</code></td>
+ <td><code>object</code></td>
+ </tr>
+ <tr>
+ <td><code>MULTISET</code></td>
+ <td><code>object</code></td>
+ </tr>
+ <tr>
+ <td><code>ROW</code></td>
+ <td><code>object</code></td>
+ </tr>
+ </tbody>
+</table>
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
new file mode 100644
index 0000000000..3cf2ec92cd
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/** Base class for text-based file readers that provides common functionality.
*/
+public abstract class BaseTextFileReader implements
FileRecordReader<InternalRow> {
+
+ protected final Path filePath;
+ protected final RowType rowType;
+ protected final BufferedReader bufferedReader;
+ protected boolean readerClosed = false;
+ protected BaseTextRecordIterator reader;
+
+ protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType
rowType) throws IOException {
+ this.filePath = filePath;
+ this.rowType = rowType;
+ this.bufferedReader =
+ new BufferedReader(
+ new InputStreamReader(
+ fileIO.newInputStream(filePath),
StandardCharsets.UTF_8));
+ this.reader = createRecordIterator();
+ }
+
+ /**
+ * Creates the specific record iterator for this file reader type.
Subclasses should implement
+ * this method to return their specific iterator.
+ */
+ protected abstract BaseTextRecordIterator createRecordIterator();
+
+ /**
+ * Parses a single line of text into an InternalRow. Subclasses must
implement this method to
+ * handle their specific format.
+ */
+ protected abstract InternalRow parseLine(String line) throws IOException;
+
+ /**
+ * Performs any additional setup before reading records. Subclasses can
override this method if
+ * they need to perform setup operations like skipping headers.
+ */
+ protected void setupReading() throws IOException {
+ // Default implementation does nothing
+ }
+
+ @Override
+ @Nullable
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ if (readerClosed) {
+ return null;
+ }
+
+ // Perform any setup needed before reading
+ setupReading();
+
+ if (reader.end) {
+ return null;
+ }
+ return reader;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!readerClosed && bufferedReader != null) {
+ bufferedReader.close();
+ readerClosed = true;
+ }
+ }
+
+ /** Base record iterator for text-based file readers. */
+ protected abstract class BaseTextRecordIterator implements
FileRecordIterator<InternalRow> {
+
+ protected long currentPosition = 0;
+ protected boolean end = false;
+
+ @Override
+ public InternalRow next() throws IOException {
+ if (readerClosed) {
+ return null;
+ }
+ String nextLine = bufferedReader.readLine();
+ if (nextLine == null) {
+ end = true;
+ return null;
+ }
+
+ currentPosition++;
+ return parseLine(nextLine);
+ }
+
+ @Override
+ public void releaseBatch() {
+ // Default implementation does nothing
+ }
+
+ @Override
+ public Path filePath() {
+ return filePath;
+ }
+
+ @Override
+ public long returnedPosition() {
+ return Math.max(0, currentPosition - 1);
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
new file mode 100644
index 0000000000..6be2123ed3
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.RowType;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+/** Base class for text-based format writers that provides common
functionality. */
+public abstract class BaseTextFileWriter implements FormatWriter {
+
+ protected final PositionOutputStream outputStream;
+ protected final BufferedWriter writer;
+ protected final RowType rowType;
+
+ protected BaseTextFileWriter(PositionOutputStream outputStream, RowType
rowType) {
+ this.outputStream = outputStream;
+ this.rowType = rowType;
+ this.writer =
+ new BufferedWriter(new OutputStreamWriter(outputStream,
StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Writes a single row element to the output stream. Subclasses must
implement this method to
+ * handle their specific format serialization.
+ */
+ @Override
+ public abstract void addElement(InternalRow element) throws IOException;
+
+ @Override
+ public void close() throws IOException {
+ writer.flush();
+ writer.close();
+ }
+
+ @Override
+ public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
+ if (suggestedCheck) {
+ return outputStream.getPos() >= targetSize;
+ }
+ return false;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 208ab73118..c85ab8208e 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -23,11 +23,9 @@ import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BaseTextFileReader;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
@@ -36,18 +34,13 @@ import org.apache.paimon.types.RowType;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
-import javax.annotation.Nullable;
-
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** CSV file reader implementation. */
-public class CsvFileReader implements FileRecordReader<InternalRow> {
+public class CsvFileReader extends BaseTextFileReader {
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
private static final CsvMapper CSV_MAPPER = new CsvMapper();
@@ -56,20 +49,13 @@ public class CsvFileReader implements
FileRecordReader<InternalRow> {
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
new ConcurrentHashMap<>(32);
- private final RowType rowType;
private final CsvOptions options;
- private final Path filePath;
private final CsvSchema schema;
- private final BufferedReader bufferedReader;
- private final CsvRecordIterator reader;
-
private boolean headerSkipped = false;
- private boolean readerClosed = false;
public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType,
CsvOptions options)
throws IOException {
- this.rowType = rowType;
- this.filePath = filePath;
+ super(fileIO, filePath, rowType);
this.options = options;
this.schema =
CsvSchema.emptySchema()
@@ -79,76 +65,30 @@ public class CsvFileReader implements
FileRecordReader<InternalRow> {
if (!options.includeHeader()) {
this.schema.withoutHeader();
}
- SeekableInputStream inputStream = fileIO.newInputStream(filePath);
- reader = new CsvRecordIterator();
- InputStreamReader inputStreamReader =
- new InputStreamReader(inputStream, StandardCharsets.UTF_8);
- this.bufferedReader = new BufferedReader(inputStreamReader);
}
@Override
- @Nullable
- public FileRecordIterator<InternalRow> readBatch() throws IOException {
- if (readerClosed) {
- return null;
- }
+ protected BaseTextRecordIterator createRecordIterator() {
+ return new CsvRecordIterator();
+ }
+
+ @Override
+ protected InternalRow parseLine(String line) throws IOException {
+ return parseCsvLine(line, schema);
+ }
+ @Override
+ protected void setupReading() throws IOException {
// Skip header if needed
if (options.includeHeader() && !headerSkipped) {
bufferedReader.readLine();
headerSkipped = true;
}
- if (reader.end) {
- return null;
- }
- return reader;
}
- @Override
- public void close() throws IOException {
- if (!readerClosed && bufferedReader != null) {
- bufferedReader.close();
- readerClosed = true;
- }
- }
-
- private class CsvRecordIterator implements FileRecordIterator<InternalRow>
{
-
- private boolean batchRead = false;
- private long currentPosition = 0;
- boolean end = false;
-
- @Override
- @Nullable
- public InternalRow next() throws IOException {
- if (batchRead || readerClosed) {
- return null;
- }
- String nextLine = bufferedReader.readLine();
- if (nextLine == null) {
- batchRead = true;
- end = true;
- return null;
- }
-
- currentPosition++;
- return parseCsvLine(nextLine, schema);
- }
-
- @Override
- public void releaseBatch() {
- // No resources to release for CSV
- }
-
- @Override
- public long returnedPosition() {
- return currentPosition - 1; // Return position of last returned row
- }
-
- @Override
- public Path filePath() {
- return filePath;
- }
+ private class CsvRecordIterator extends BaseTextRecordIterator {
+ // Inherits all functionality from BaseTextRecordIterator
+ // No additional CSV-specific iterator logic needed
}
protected static String[] parseCsvLineToArray(String line, CsvSchema
schema)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index e6f5fd167d..754bb5a192 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -21,42 +21,32 @@ package org.apache.paimon.format.csv;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.BaseTextFileWriter;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
-import java.io.BufferedWriter;
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** CSV format writer implementation. */
-public class CsvFormatWriter implements FormatWriter {
+public class CsvFormatWriter extends BaseTextFileWriter {
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
// Performance optimization: Cache frequently used cast executors
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
new ConcurrentHashMap<>(32);
- private final RowType rowType;
private final CsvOptions options;
-
- private final BufferedWriter writer;
- private final PositionOutputStream outputStream;
private boolean headerWritten = false;
-
private final StringBuilder stringBuilder;
public CsvFormatWriter(PositionOutputStream out, RowType rowType,
CsvOptions options) {
- this.rowType = rowType;
+ super(out, rowType);
this.options = options;
- this.outputStream = out;
- this.writer = new BufferedWriter(new OutputStreamWriter(out,
StandardCharsets.UTF_8));
this.stringBuilder = new StringBuilder();
}
@@ -87,20 +77,6 @@ public class CsvFormatWriter implements FormatWriter {
writer.write(stringBuilder.toString());
}
- @Override
- public void close() throws IOException {
- writer.flush();
- writer.close();
- }
-
- @Override
- public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
- if (suggestedCheck) {
- return outputStream.getPos() >= targetSize;
- }
- return false;
- }
-
private void writeHeader() throws IOException {
stringBuilder.setLength(0); // Reuse StringBuilder
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
new file mode 100644
index 0000000000..49790fefc2
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.CloseShieldOutputStream;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** JSON {@link FileFormat}. */
+public class JsonFileFormat extends FileFormat {
+
+ public static final String IDENTIFIER = "json";
+
+ private final Options options;
+
+ public JsonFileFormat(FormatContext context) {
+ super(IDENTIFIER);
+ this.options = getIdentifierPrefixOptions(context.options());
+ }
+
+ @Override
+ public FormatReaderFactory createReaderFactory(
+ RowType projectedRowType, @Nullable List<Predicate> filters) {
+ return new JsonReaderFactory(projectedRowType, new
JsonOptions(options));
+ }
+
+ @Override
+ public FormatWriterFactory createWriterFactory(RowType type) {
+ return new JsonWriterFactory(type, new JsonOptions(options));
+ }
+
+ @Override
+ public void validateDataFields(RowType rowType) {
+ List<DataType> fieldTypes = rowType.getFieldTypes();
+ for (DataType dataType : fieldTypes) {
+ validateDataType(dataType);
+ }
+ }
+
+ private void validateDataType(DataType dataType) {
+ // JSON format supports all data types since they can be represented
as JSON values
+ DataTypeRoot typeRoot = dataType.getTypeRoot();
+ switch (typeRoot) {
+ case CHAR:
+ case VARCHAR:
+ case BOOLEAN:
+ case BINARY:
+ case VARBINARY:
+ case DECIMAL:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case ARRAY:
+ case MAP:
+ case ROW:
+ // All types are supported in JSON
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported data type for JSON format: " + dataType);
+ }
+ }
+
+ /** A {@link FormatWriterFactory} to write {@link InternalRow} to JSON. */
+ private static class JsonWriterFactory implements FormatWriterFactory {
+
+ private final RowType rowType;
+ private final JsonOptions options;
+
+ public JsonWriterFactory(RowType rowType, JsonOptions options) {
+ this.rowType = rowType;
+ this.options = options;
+ }
+
+ @Override
+ public FormatWriter create(PositionOutputStream out, String
compression) {
+ return new JsonFormatWriter(new CloseShieldOutputStream(out),
rowType, options);
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormatFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormatFactory.java
new file mode 100644
index 0000000000..74dbae14f8
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link JsonFileFormat}. */
+public class JsonFileFormatFactory implements FileFormatFactory {
+
+ public static final String IDENTIFIER = "json";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public FileFormat create(FormatContext formatContext) {
+ return new JsonFileFormat(formatContext);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
new file mode 100644
index 0000000000..96d5ecfa5d
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BaseTextFileReader;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** JSON file reader. */
+public class JsonFileReader extends BaseTextFileReader {
+
+ private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
+
+ private final JsonOptions options;
+
+ public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType,
JsonOptions options)
+ throws IOException {
+ super(fileIO, filePath, rowType);
+ this.options = options;
+ }
+
+ @Override
+ protected BaseTextRecordIterator createRecordIterator() {
+ return new JsonRecordIterator();
+ }
+
+ @Override
+ protected InternalRow parseLine(String line) throws IOException {
+ try {
+ return convertJsonStringToRow(line, rowType, options);
+ } catch (JsonProcessingException e) {
+ if (options.ignoreParseErrors()) {
+ return null;
+ } else {
+ throw new IOException("Failed to parse JSON line: " + line, e);
+ }
+ } catch (RuntimeException e) {
+ if (options.ignoreParseErrors()) {
+ return null;
+ } else {
+ throw new IOException("Failed to convert JSON line: " + line,
e);
+ }
+ }
+ }
+
+ private class JsonRecordIterator extends BaseTextRecordIterator {
+ // Inherits all functionality from BaseTextRecordIterator
+ // No additional JSON-specific iterator logic needed
+ }
+
+ private InternalRow convertJsonStringToRow(String line, RowType rowType,
JsonOptions options)
+ throws JsonProcessingException {
+ JsonNode jsonNode =
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line);
+ return (InternalRow) convertJsonValue(jsonNode, rowType, options);
+ }
+
+ private Object convertJsonValue(JsonNode node, DataType dataType,
JsonOptions options) {
+ if (node == null || node.isNull()) {
+ return null;
+ }
+
+ switch (dataType.getTypeRoot()) {
+ case BINARY:
+ case VARBINARY:
+ try {
+ return BASE64_DECODER.decode(node.asText());
+ } catch (Exception e) {
+ return handleParseError(e);
+ }
+ case ARRAY:
+ return convertJsonArray(node, (ArrayType) dataType, options);
+ case MAP:
+ return convertJsonMap(node, (MapType) dataType, options);
+ case ROW:
+ return convertJsonRow(node, (RowType) dataType, options);
+ default:
+ return convertPrimitiveStringToType(node.asText(), dataType,
options);
+ }
+ }
+
+ private GenericArray convertJsonArray(
+ JsonNode arrayNode, ArrayType arrayType, JsonOptions options) {
+ if (!arrayNode.isArray()) {
+ return handleParseError(
+ new RuntimeException(
+ "Expected array node but got: " +
arrayNode.getNodeType()));
+ }
+
+ int size = arrayNode.size();
+ List<Object> elements = new ArrayList<>(size); // Pre-allocate capacity
+ DataType elementType = arrayType.getElementType();
+
+ for (int i = 0; i < size; i++) {
+ Object element;
+ try {
+ element = convertJsonValue(arrayNode.get(i), elementType,
options);
+ elements.add(element);
+ } catch (Exception e) {
+ Object elementValue = handleParseError(e);
+ elements.add(elementValue);
+ }
+ }
+ return new GenericArray(elements.toArray());
+ }
+
+ private GenericMap convertJsonMap(JsonNode objectNode, MapType mapType,
JsonOptions options) {
+ if (!objectNode.isObject()) {
+ return handleParseError(
+ new IllegalArgumentException(
+ "Expected object node but got: " +
objectNode.getNodeType()));
+ }
+
+ Map<Object, Object> map = new LinkedHashMap<>(objectNode.size());
+ JsonOptions.MapNullKeyMode mapNullKeyMode =
options.getMapNullKeyMode();
+ String mapNullKeyLiteral = options.getMapNullKeyLiteral();
+ DataType keyType = mapType.getKeyType();
+ DataType valueType = mapType.getValueType();
+
+ objectNode
+ .fields()
+ .forEachRemaining(
+ field -> {
+ try {
+ String keyStr = field.getKey();
+
+ // Handle null keys based on the configured
mode
+ if (keyStr == null) {
+ switch (mapNullKeyMode) {
+ case DROP:
+ return; // Skip this entry
+ case FAIL:
+ if (options.ignoreParseErrors()) {
+ return; // Skip null keys when
ignoring errors
+ }
+ throw new RuntimeException(
+ "Null map key encountered
and map-null-key-mode is set to FAIL.");
+ case LITERAL:
+ // Will be handled below in key
conversion
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unknown MapNullKeyMode: "
+ mapNullKeyMode);
+ }
+ }
+
+ // Convert the key
+ Object key;
+ if (keyStr == null) {
+ // Only LITERAL mode reaches here for null
keys
+ key =
+ convertPrimitiveStringToType(
+ mapNullKeyLiteral,
keyType, options);
+ } else {
+ key = convertPrimitiveStringToType(keyStr,
keyType, options);
+ }
+
+ // Add the entry to the map if key is not null
+ if (key != null) {
+ Object value =
+ convertJsonValue(field.getValue(),
valueType, options);
+ map.put(key, value);
+ }
+ } catch (Exception e) {
+ // Use the error handling method for
consistency
+ handleParseError(e);
+ }
+ });
+ return new GenericMap(map);
+ }
+
+ private Object convertPrimitiveStringToType(
+ String str, DataType dataType, JsonOptions options) {
+ try {
+ switch (dataType.getTypeRoot()) {
+ case TINYINT:
+ return Byte.parseByte(str);
+ case SMALLINT:
+ return Short.parseShort(str);
+ case INTEGER:
+ return Integer.parseInt(str);
+ case BIGINT:
+ return Long.parseLong(str);
+ case FLOAT:
+ return Float.parseFloat(str);
+ case DOUBLE:
+ return Double.parseDouble(str);
+ case BOOLEAN:
+ return Boolean.parseBoolean(str);
+ case CHAR:
+ case VARCHAR:
+ return BinaryString.fromString(str);
+ default:
+ BinaryString binaryString = BinaryString.fromString(str);
+ CastExecutor cast =
CastExecutors.resolve(DataTypes.STRING(), dataType);
+ return cast.cast(binaryString);
+ }
+ } catch (Exception e) {
+ return handleParseError(e);
+ }
+ }
+
+ private GenericRow convertJsonRow(JsonNode objectNode, RowType rowType,
JsonOptions options) {
+ if (!objectNode.isObject()) {
+ return handleParseError(
+ new IllegalArgumentException(
+ "Expected object node but got: " +
objectNode.getNodeType()));
+ }
+
+ List<DataField> fields = rowType.getFields();
+ int fieldCount = fields.size();
+ Object[] values = new Object[fieldCount];
+
+ for (int i = 0; i < fieldCount; i++) {
+ DataField field = fields.get(i);
+ try {
+ values[i] = convertJsonValue(objectNode.get(field.name()),
field.type(), options);
+ } catch (Exception e) {
+ values[i] = handleParseError(e);
+ }
+ }
+ return GenericRow.of(values);
+ }
+
+ /**
+ * Handles parse errors based on the ignoreParseErrors option.
+ *
+ * @param exception The exception that occurred
+ * @return null if ignoring errors, otherwise re-throws the exception
+ * @throws RuntimeException if not ignoring errors
+ */
+ private <T> T handleParseError(Exception exception) {
+ if (options.ignoreParseErrors()) {
+ return null;
+ } else {
+ if (exception instanceof RuntimeException) {
+ throw (RuntimeException) exception;
+ } else {
+ throw new RuntimeException(exception);
+ }
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
new file mode 100644
index 0000000000..de6b1a70f9
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BaseTextFileWriter;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Json format writer implementation. */
+public class JsonFormatWriter extends BaseTextFileWriter {
+
+ private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
+
+ private final char lineDelimiter;
+
+ public JsonFormatWriter(
+ PositionOutputStream outputStream, RowType rowType, JsonOptions
options) {
+ super(outputStream, rowType);
+ this.lineDelimiter = options.getLineDelimiter().charAt(0);
+ }
+
+ @Override
+ public void addElement(InternalRow element) throws IOException {
+ try {
+ String jsonString = convertRowToJsonString(element, rowType);
+ writer.write(jsonString);
+ writer.write(lineDelimiter);
+ } catch (JsonProcessingException e) {
+ throw new IOException("Failed to convert row to JSON string", e);
+ }
+ }
+
+ private String convertRowToJsonString(InternalRow row, RowType rowType)
+ throws JsonProcessingException {
+ Map<String, Object> result = convertRowToMap(row, rowType);
+ return JsonSerdeUtil.writeValueAsString(result);
+ }
+
+ private Map<String, Object> convertRowToMap(InternalRow row, RowType
rowType) {
+ List<DataField> fields = rowType.getFields();
+ int fieldCount = fields.size();
+ Map<String, Object> result = new LinkedHashMap<>(fieldCount); //
Pre-allocate capacity
+
+ for (int i = 0; i < fieldCount; i++) {
+ DataField field = fields.get(i);
+ Object value = InternalRowUtils.get(row, i, field.type());
+ result.put(field.name(), convertRowValue(value, field.type()));
+ }
+ return result;
+ }
+
+ private Object convertRowValue(Object value, DataType dataType) {
+ if (value == null) {
+ return null;
+ }
+
+ switch (dataType.getTypeRoot()) {
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case CHAR:
+ case VARCHAR:
+ return value.toString();
+ case BINARY:
+ case VARBINARY:
+ return BASE64_ENCODER.encodeToString((byte[]) value);
+ case ARRAY:
+ return convertRowArray((InternalArray) value, (ArrayType)
dataType);
+ case MAP:
+ return convertRowMap((InternalMap) value, (MapType) dataType);
+ case ROW:
+ return convertRowToMap((InternalRow) value, (RowType)
dataType);
+ default:
+ CastExecutor cast = CastExecutors.resolveToString(dataType);
+ return cast.cast(value).toString();
+ }
+ }
+
+ private List<Object> convertRowArray(InternalArray array, ArrayType
arrayType) {
+ int size = array.size();
+ List<Object> result = new ArrayList<>(size); // Pre-allocate capacity
+ DataType elementType = arrayType.getElementType();
+
+ for (int i = 0; i < size; i++) {
+ result.add(convertRowValue(InternalRowUtils.get(array, i,
elementType), elementType));
+ }
+ return result;
+ }
+
+ private Map<String, Object> convertRowMap(InternalMap map, MapType
mapType) {
+ int size = map.size();
+ Map<String, Object> result = new LinkedHashMap<>(size); //
Pre-allocate capacity
+ InternalArray keyArray = map.keyArray();
+ InternalArray valueArray = map.valueArray();
+ DataType keyType = mapType.getKeyType();
+ DataType valueType = mapType.getValueType();
+
+ for (int i = 0; i < size; i++) {
+ Object key = InternalRowUtils.get(keyArray, i, keyType);
+ Object value = InternalRowUtils.get(valueArray, i, valueType);
+ result.put(convertToString(key, keyType), convertRowValue(value,
valueType));
+ }
+ return result;
+ }
+
+ private String convertToString(Object value, DataType dataType) {
+ if (value == null) {
+ return null;
+ }
+
+ DataTypeRoot typeRoot = dataType.getTypeRoot();
+ if (typeRoot == DataTypeRoot.CHAR || typeRoot == DataTypeRoot.VARCHAR)
{
+ return ((BinaryString) value).toString();
+ }
+ return value.toString();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonOptions.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonOptions.java
new file mode 100644
index 0000000000..1b21db3e87
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonOptions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+
+import static org.apache.paimon.options.description.TextElement.text;
+
+/** Options for Json format. */
+public class JsonOptions {
+
+ public static final ConfigOption<Boolean> JSON_IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("json.ignore-parse-errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to ignore parse errors for JSON
format");
+
+ public static final ConfigOption<MapNullKeyMode> JSON_MAP_NULL_KEY_MODE =
+ ConfigOptions.key("json.map-null-key-mode")
+ .enumType(MapNullKeyMode.class)
+ .defaultValue(MapNullKeyMode.FAIL)
+ .withDescription("How to handle map keys that are null.");
+
+ public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL =
+ ConfigOptions.key("json.map-null-key-literal")
+ .stringType()
+ .defaultValue("null")
+ .withDescription(
+ "Literal to use for null map keys when
map-null-key-mode is LITERAL");
+
+ public static final ConfigOption<String> LINE_DELIMITER =
+ ConfigOptions.key("json.line-delimiter")
+ .stringType()
+ .defaultValue("\n")
+ .withDescription("The line delimiter for JSON format");
+
+ /** Enum for handling null keys in JSON maps. */
+ public enum MapNullKeyMode implements DescribedEnum {
+ FAIL("fail", "Throw an exception when encountering null map keys."),
+ DROP("drop", "Drop entries with null keys from the map."),
+ LITERAL("literal", "Replace null keys with a literal string value.");
+
+ private final String value;
+ private final String description;
+
+ MapNullKeyMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
+ private final boolean ignoreParseErrors;
+ private final MapNullKeyMode mapNullKeyMode;
+ private final String mapNullKeyLiteral;
+ private final String lineDelimiter;
+
+ public JsonOptions(Options options) {
+ this.ignoreParseErrors = options.get(JSON_IGNORE_PARSE_ERRORS);
+ this.mapNullKeyMode = options.get(JSON_MAP_NULL_KEY_MODE);
+ this.mapNullKeyLiteral = options.get(JSON_MAP_NULL_KEY_LITERAL);
+ this.lineDelimiter = options.get(LINE_DELIMITER);
+ }
+
+ public boolean ignoreParseErrors() {
+ return ignoreParseErrors;
+ }
+
+ public MapNullKeyMode getMapNullKeyMode() {
+ return mapNullKeyMode;
+ }
+
+ public String getMapNullKeyLiteral() {
+ return mapNullKeyLiteral;
+ }
+
+ public String getLineDelimiter() {
+ return lineDelimiter;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
new file mode 100644
index 0000000000..b82d77d948
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+
+/** Factory to create {@link JsonFileReader}. */
+public class JsonReaderFactory implements FormatReaderFactory {
+
+ private final RowType projectedRowType;
+ private final JsonOptions options;
+
+ public JsonReaderFactory(RowType projectedRowType, JsonOptions options) {
+ this.projectedRowType = projectedRowType;
+ this.options = options;
+ }
+
+ @Override
+ public FileRecordReader<InternalRow> createReader(Context context) throws
IOException {
+ FileIO fileIO = context.fileIO();
+ Path filePath = context.filePath();
+
+ return new JsonFileReader(fileIO, filePath, projectedRowType, options);
+ }
+}
diff --git
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index c35f5544ca..d05cf3844f 100644
---
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -17,3 +17,4 @@ org.apache.paimon.format.avro.AvroFileFormatFactory
org.apache.paimon.format.orc.OrcFileFormatFactory
org.apache.paimon.format.parquet.ParquetFileFormatFactory
org.apache.paimon.format.csv.CsvFileFormatFactory
+org.apache.paimon.format.json.JsonFileFormatFactory
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
new file mode 100644
index 0000000000..dd38623279
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link JsonFileFormat}. */
+public class JsonFileFormatTest extends FormatReadWriteTest {
+
+ protected JsonFileFormatTest() {
+ super("json");
+ }
+
+ @Override
+ protected FileFormat fileFormat() {
+ return new JsonFileFormat(new FileFormatFactory.FormatContext(new
Options(), 1024, 1024));
+ }
+
+ @Test
+ public void testIgnoreParseErrorsEnabled() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.STRING());
+
+ Options options = new Options();
+ options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true);
+
+ FileFormat format =
+ new JsonFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ Path testFile = new Path(parent, "test_ignore_errors_" +
UUID.randomUUID() + ".json");
+
+ // Write test data with some malformed JSON lines
+ try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false)) {
+ String validJson1 = "{\"f0\":1,\"f1\":\"Alice\"}\n";
+ String invalidJson =
+ "{\"f0\":invalid,\"f1\":\"Bob\"\n"; // Missing closing
brace and invalid value
+ String validJson2 = "{\"f0\":3,\"f1\":\"Charlie\"}\n";
+ String anotherInvalidJson = "not a json at all\n";
+ String validJson3 = "{\"f0\":4,\"f1\":\"David\"}\n";
+
+ out.write(validJson1.getBytes());
+ out.write(invalidJson.getBytes());
+ out.write(validJson2.getBytes());
+ out.write(anotherInvalidJson.getBytes());
+ out.write(validJson3.getBytes());
+ }
+
+ // Read data - should skip malformed lines and return only valid ones
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType)
+ .createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile)))) {
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(rowType);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(
+ row -> {
+ if (row != null) { // ignoreParseErrors returns null
for malformed lines
+ result.add(serializer.copy(row));
+ }
+ });
+
+ // Should only have 3 valid rows (Alice, Charlie, David)
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(result.get(1).getInt(0)).isEqualTo(3);
+
assertThat(result.get(1).getString(1).toString()).isEqualTo("Charlie");
+ assertThat(result.get(2).getInt(0)).isEqualTo(4);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("David");
+ }
+ }
+
+ @Test
+ public void testIgnoreParseErrorsDisabled() throws IOException {
+ RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(),
DataTypes.STRING());
+
+ Options options = new Options();
+ options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, false); //
Explicitly disable
+
+ FileFormat format =
+ new JsonFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ Path testFile = new Path(parent, "test_no_ignore_errors_" +
UUID.randomUUID() + ".json");
+
+ // Write test data with some malformed JSON lines
+ try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false)) {
+ String validJson1 = "{\"f0\":1,\"f1\":\"Alice\"}\n";
+ String invalidJson =
+ "{\"f0\":invalid,\"f1\":\"Bob\"\n"; // Missing closing
brace and invalid value
+
+ out.write(validJson1.getBytes());
+ out.write(invalidJson.getBytes());
+ }
+
+ // Read data - should throw exception on malformed JSON
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType)
+ .createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile)))) {
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(rowType);
+ List<InternalRow> result = new ArrayList<>();
+
+ // Should throw IOException when encountering malformed JSON
+ assertThrows(
+ IOException.class,
+ () -> {
+ reader.forEachRemaining(row ->
result.add(serializer.copy(row)));
+ });
+
+ // Should have read the first valid row before encountering the
error
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+ }
+ }
+
+ @Test
+ public void testIgnoreParseErrorsWithComplexTypes() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.STRING(),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+ Options options = new Options();
+ options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true);
+
+ FileFormat format =
+ new JsonFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+ Path testFile =
+ new Path(parent, "test_complex_ignore_errors_" +
UUID.randomUUID() + ".json");
+
+ // Write test data with some malformed JSON lines
+ try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false)) {
+ String validJson1 =
+
"{\"f0\":1,\"f1\":\"Alice\",\"f2\":[\"a\",\"b\"],\"f3\":{\"key1\":1,\"key2\":2}}\n";
+ String invalidArrayJson =
+
"{\"f0\":2,\"f1\":\"Bob\",\"f2\":\"not_an_array\",\"f3\":{\"key1\":1}}\n"; //
Invalid array
+ String validJson2 =
+
"{\"f0\":3,\"f1\":\"Charlie\",\"f2\":[\"c\",\"d\"],\"f3\":{\"key3\":3}}\n";
+ String invalidMapJson =
+
"{\"f0\":4,\"f1\":\"David\",\"f2\":[\"e\"],\"f3\":\"not_a_map\"}\n"; // Invalid
+ // map
+
+ out.write(validJson1.getBytes());
+ out.write(invalidArrayJson.getBytes());
+ out.write(validJson2.getBytes());
+ out.write(invalidMapJson.getBytes());
+ }
+
+ // Read data - should handle type conversion errors gracefully
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType)
+ .createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile)))) {
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(rowType);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(
+ row -> {
+ if (row != null) {
+ result.add(serializer.copy(row));
+ }
+ });
+
+ // Should have valid rows, with null values for failed conversions
+ assertThat(result).hasSize(4);
+
+ // First row should be completely valid
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+ assertThat(result.get(0).getArray(2)).isNotNull();
+ assertThat(result.get(0).getMap(3)).isNotNull();
+
+ // Second row should have null array due to type mismatch
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob");
+ assertThat(result.get(1).isNullAt(2))
+ .isTrue(); // Array should be null due to conversion error
+
+ // Third row should be completely valid
+ assertThat(result.get(2).getInt(0)).isEqualTo(3);
+
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+
+ // Fourth row should have null map due to type mismatch
+ assertThat(result.get(3).getInt(0)).isEqualTo(4);
+
assertThat(result.get(3).getString(1).toString()).isEqualTo("David");
+ assertThat(result.get(3).isNullAt(3))
+ .isTrue(); // Map should be null due to conversion error
+ }
+ }
+
+ @Test
+ public void testMapNullKeyModeFailWithWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+ // Test JSON_MAP_NULL_KEY_MODE = FAIL with actual data
+ Options options = new Options();
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE,
JsonOptions.MapNullKeyMode.FAIL);
+
+ // Create test data with valid maps
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, new GenericMap(createTestMap("key1",
1, "key2", 2))),
+ GenericRow.of(2, new GenericMap(createTestMap("name",
100, "value", 200))));
+
+ List<InternalRow> result = writeThenRead(options, rowType, testData,
"test_fail_mode");
+
+ // Verify results
+ assertThat(result).hasSize(2);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).getMap(1).size()).isEqualTo(2);
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+ assertThat(result.get(1).getMap(1).size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testMapNullKeyModeDropWithWriteRead() throws IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+ // Test JSON_MAP_NULL_KEY_MODE = DROP with actual data
+ Options options = new Options();
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE,
JsonOptions.MapNullKeyMode.DROP);
+
+ // Create test data
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(
+ 1, new GenericMap(createTestMap("key1", 1,
"key2", 2, "key3", 3))));
+
+ List<InternalRow> result = writeThenRead(options, rowType, testData,
"test_drop_mode");
+
+ // Verify results
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).getMap(1).size()).isEqualTo(3);
+ }
+
+ @Test
+ public void testDifferentMapNullKeyLiteralsWithWriteRead() throws
IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+
+ String[] literals = {"EMPTY", "MISSING", "UNDEFINED", "NULL_VALUE"};
+
+ // Create test data once (reused for all literals)
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericMap(
+ createTestMap("name", "Alice", "city",
"New York"))));
+
+ for (String literal : literals) {
+ Options options = new Options();
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE,
JsonOptions.MapNullKeyMode.LITERAL);
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_LITERAL, literal);
+
+ List<InternalRow> result =
+ writeThenRead(options, rowType, testData, "test_literal_"
+ literal);
+
+ // Verify results
+ assertThat(result).hasSize(1);
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+ assertThat(result.get(0).getMap(1).size()).isEqualTo(2);
+ }
+ }
+
+ @Test
+ public void testJsonWriteReadWithDifferentLineDelimiters() throws
IOException {
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.INT().notNull(),
+ DataTypes.STRING(),
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+
+ String[] delimiters = {"\n", "\r", "\r\n"};
+
+ // Create test data once (reused for all delimiters)
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("first"),
+ new GenericMap(createTestMap("name", "Alice",
"role", "admin"))),
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("second"),
+ new GenericMap(createTestMap("name", "Bob",
"role", "user"))));
+
+ for (String delimiter : delimiters) {
+ Options options = new Options();
+ options.set(JsonOptions.LINE_DELIMITER, delimiter);
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE,
JsonOptions.MapNullKeyMode.LITERAL);
+ options.set(JsonOptions.JSON_MAP_NULL_KEY_LITERAL, "NULL");
+
+ List<InternalRow> result =
+ writeThenRead(options, rowType, testData, "test_delim_" +
delimiter.hashCode());
+
+ // Verify results
+ assertThat(result).hasSize(2);
+
+ // Verify first row
+ assertThat(result.get(0).getInt(0)).isEqualTo(1);
+
assertThat(result.get(0).getString(1).toString()).isEqualTo("first");
+ assertThat(result.get(0).getMap(2).size()).isEqualTo(2);
+
+ // Verify second row
+ assertThat(result.get(1).getInt(0)).isEqualTo(2);
+
assertThat(result.get(1).getString(1).toString()).isEqualTo("second");
+ assertThat(result.get(1).getMap(2).size()).isEqualTo(2);
+ }
+ }
+
+ @Override
+ public boolean supportDataFileWithoutExtension() {
+ return true;
+ }
+
+ /** Creates a test map with BinaryString keys from String key-value pairs.
*/
+ private java.util.Map<BinaryString, Object> createTestMap(Object...
keyValuePairs) {
+ if (keyValuePairs.length % 2 != 0) {
+ throw new IllegalArgumentException("Key-value pairs must be even
number of arguments");
+ }
+
+ java.util.Map<BinaryString, Object> map = new java.util.HashMap<>();
+ for (int i = 0; i < keyValuePairs.length; i += 2) {
+ String key = (String) keyValuePairs[i];
+ Object value = keyValuePairs[i + 1];
+ if (value instanceof String) {
+ map.put(BinaryString.fromString(key),
BinaryString.fromString((String) value));
+ } else {
+ map.put(BinaryString.fromString(key), value);
+ }
+ }
+ return map;
+ }
+
+ private List<InternalRow> writeThenRead(
+ Options options, RowType rowType, List<InternalRow> testData,
String testPrefix)
+ throws IOException {
+ FileFormat format =
+ new JsonFileFormat(new
FileFormatFactory.FormatContext(options, 1024, 1024));
+ Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID()
+ ".json");
+ FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false);
+ FormatWriter writer = writerFactory.create(out, "none")) {
+ for (InternalRow row : testData) {
+ writer.addElement(row);
+ }
+ }
+ try (RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType)
+ .createReader(
+ new FormatReaderContext(
+ fileIO, testFile,
fileIO.getFileSize(testFile)))) {
+
+ InternalRowSerializer serializer = new
InternalRowSerializer(rowType);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+ return result;
+ }
+ }
+}