This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b27977b43 [format] support json format (#6096)
6b27977b43 is described below

commit 6b27977b43e041ed0c3d0404e55bcead44e36257
Author: jerry <[email protected]>
AuthorDate: Thu Aug 21 10:13:53 2025 +0800

    [format] support json format (#6096)
---
 docs/content/concepts/spec/fileformat.md           | 132 ++++++-
 .../apache/paimon/format/BaseTextFileReader.java   | 134 +++++++
 .../apache/paimon/format/BaseTextFileWriter.java   |  64 ++++
 .../apache/paimon/format/csv/CsvFileReader.java    |  92 +----
 .../apache/paimon/format/csv/CsvFormatWriter.java  |  30 +-
 .../apache/paimon/format/json/JsonFileFormat.java  | 117 ++++++
 .../paimon/format/json/JsonFileFormatFactory.java  |  38 ++
 .../apache/paimon/format/json/JsonFileReader.java  | 282 ++++++++++++++
 .../paimon/format/json/JsonFormatWriter.java       | 159 ++++++++
 .../org/apache/paimon/format/json/JsonOptions.java | 113 ++++++
 .../paimon/format/json/JsonReaderFactory.java      |  48 +++
 .../org.apache.paimon.format.FileFormatFactory     |   1 +
 .../paimon/format/json/JsonFileFormatTest.java     | 421 +++++++++++++++++++++
 13 files changed, 1527 insertions(+), 104 deletions(-)

diff --git a/docs/content/concepts/spec/fileformat.md 
b/docs/content/concepts/spec/fileformat.md
index d0873de348..6599e258ab 100644
--- a/docs/content/concepts/spec/fileformat.md
+++ b/docs/content/concepts/spec/fileformat.md
@@ -496,4 +496,134 @@ The following table lists the type mapping from Paimon 
type to CSV type.
 
 Experimental feature, not recommended for production.
 
-TODO
+Format Options:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>json.ignore-parse-errors</h5></td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether to ignore parse errors for JSON format. Skip fields and rows 
with parse errors instead of failing. Fields are set to null in case of 
errors.</td>
+    </tr>
+    <tr>
+      <td><h5>json.map-null-key-mode</h5></td>
+      <td style="word-wrap: break-word;"><code>FAIL</code></td>
+      <td>String</td>
+      <td>How to handle map keys that are null. Currently supported values are 
<code>'FAIL'</code>, <code>'DROP'</code> and <code>'LITERAL'</code>:
+      <ul>
+      <li>Option <code>'FAIL'</code> will throw exception when encountering 
map with null key.</li>
+      <li>Option <code>'DROP'</code> will drop null key entries for map.</li>
+      <li>Option <code>'LITERAL'</code> will replace null key with string 
literal. The string literal is defined by 
<code>json.map-null-key-literal</code> option.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>json.map-null-key-literal</h5></td>
+      <td style="word-wrap: break-word;"><code>null</code></td>
+      <td>String</td>
+      <td>Literal to use for null map keys when 
<code>json.map-null-key-mode</code> is LITERAL.</td>
+    </tr>
+    <tr>
+      <td><h5>json.line-delimiter</h5></td>
+      <td style="word-wrap: break-word;"><code>\n</code></td>
+      <td>String</td>
+      <td>The line delimiter for JSON format.</td>
+    </tr>
+    </tbody>
+</table>
+
+Paimon JSON format uses [jackson databind 
API](https://github.com/FasterXML/jackson-databind) to parse and generate JSON 
string.
+
+The following table lists the type mapping from Paimon type to JSON type.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Paimon type</th>
+        <th class="text-left">JSON type</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>CHAR / VARCHAR / STRING</code></td>
+      <td><code>string</code></td>
+    </tr>
+    <tr>
+      <td><code>BOOLEAN</code></td>
+      <td><code>boolean</code></td>
+    </tr>
+    <tr>
+      <td><code>BINARY / VARBINARY</code></td>
+      <td><code>string with encoding: base64</code></td>
+    </tr>
+    <tr>
+      <td><code>DECIMAL</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>TINYINT</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>SMALLINT</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>INT</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>BIGINT</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>FLOAT</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>DOUBLE</code></td>
+      <td><code>number</code></td>
+    </tr>
+    <tr>
+      <td><code>DATE</code></td>
+      <td><code>string with format: date</code></td>
+    </tr>
+    <tr>
+      <td><code>TIME</code></td>
+      <td><code>string with format: time</code></td>
+    </tr>
+    <tr>
+      <td><code>TIMESTAMP</code></td>
+      <td><code>string with format: date-time</code></td>
+    </tr>
+    <tr>
+      <td><code>TIMESTAMP_LOCAL_ZONE</code></td>
+      <td><code>string with format: date-time (with UTC time zone)</code></td>
+    </tr>
+    <tr>
+      <td><code>ARRAY</code></td>
+      <td><code>array</code></td>
+    </tr>
+    <tr>
+      <td><code>MAP</code></td>
+      <td><code>object</code></td>
+    </tr>
+    <tr>
+      <td><code>MULTISET</code></td>
+      <td><code>object</code></td>
+    </tr>
+    <tr>
+      <td><code>ROW</code></td>
+      <td><code>object</code></td>
+    </tr>
+    </tbody>
+</table>
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
new file mode 100644
index 0000000000..3cf2ec92cd
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileReader.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+/** Base class for text-based file readers that provides common functionality. 
*/
+public abstract class BaseTextFileReader implements 
FileRecordReader<InternalRow> {
+
+    protected final Path filePath;
+    protected final RowType rowType;
+    protected final BufferedReader bufferedReader;
+    protected boolean readerClosed = false;
+    protected BaseTextRecordIterator reader;
+
+    protected BaseTextFileReader(FileIO fileIO, Path filePath, RowType 
rowType) throws IOException {
+        this.filePath = filePath;
+        this.rowType = rowType;
+        this.bufferedReader =
+                new BufferedReader(
+                        new InputStreamReader(
+                                fileIO.newInputStream(filePath), 
StandardCharsets.UTF_8));
+        this.reader = createRecordIterator();
+    }
+
+    /**
+     * Creates the specific record iterator for this file reader type. 
Subclasses should implement
+     * this method to return their specific iterator.
+     */
+    protected abstract BaseTextRecordIterator createRecordIterator();
+
+    /**
+     * Parses a single line of text into an InternalRow. Subclasses must 
implement this method to
+     * handle their specific format.
+     */
+    protected abstract InternalRow parseLine(String line) throws IOException;
+
+    /**
+     * Performs any additional setup before reading records. Subclasses can 
override this method if
+     * they need to perform setup operations like skipping headers.
+     */
+    protected void setupReading() throws IOException {
+        // Default implementation does nothing
+    }
+
+    @Override
+    @Nullable
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        if (readerClosed) {
+            return null;
+        }
+
+        // Perform any setup needed before reading
+        setupReading();
+
+        if (reader.end) {
+            return null;
+        }
+        return reader;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!readerClosed && bufferedReader != null) {
+            bufferedReader.close();
+            readerClosed = true;
+        }
+    }
+
+    /** Base record iterator for text-based file readers. */
+    protected abstract class BaseTextRecordIterator implements 
FileRecordIterator<InternalRow> {
+
+        protected long currentPosition = 0;
+        protected boolean end = false;
+
+        @Override
+        public InternalRow next() throws IOException {
+            if (readerClosed) {
+                return null;
+            }
+            String nextLine = bufferedReader.readLine();
+            if (nextLine == null) {
+                end = true;
+                return null;
+            }
+
+            currentPosition++;
+            return parseLine(nextLine);
+        }
+
+        @Override
+        public void releaseBatch() {
+            // Default implementation does nothing
+        }
+
+        @Override
+        public Path filePath() {
+            return filePath;
+        }
+
+        @Override
+        public long returnedPosition() {
+            return Math.max(0, currentPosition - 1);
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
new file mode 100644
index 0000000000..6be2123ed3
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.RowType;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+/** Base class for text-based format writers that provides common 
functionality. */
+public abstract class BaseTextFileWriter implements FormatWriter {
+
+    protected final PositionOutputStream outputStream;
+    protected final BufferedWriter writer;
+    protected final RowType rowType;
+
+    protected BaseTextFileWriter(PositionOutputStream outputStream, RowType 
rowType) {
+        this.outputStream = outputStream;
+        this.rowType = rowType;
+        this.writer =
+                new BufferedWriter(new OutputStreamWriter(outputStream, 
StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Writes a single row element to the output stream. Subclasses must 
implement this method to
+     * handle their specific format serialization.
+     */
+    @Override
+    public abstract void addElement(InternalRow element) throws IOException;
+
+    @Override
+    public void close() throws IOException {
+        writer.flush();
+        writer.close();
+    }
+
+    @Override
+    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
+        if (suggestedCheck) {
+            return outputStream.getPos() >= targetSize;
+        }
+        return false;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 208ab73118..c85ab8208e 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -23,11 +23,9 @@ import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BaseTextFileReader;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.reader.FileRecordIterator;
-import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
@@ -36,18 +34,13 @@ import org.apache.paimon.types.RowType;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
-import javax.annotation.Nullable;
-
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** CSV file reader implementation. */
-public class CsvFileReader implements FileRecordReader<InternalRow> {
+public class CsvFileReader extends BaseTextFileReader {
 
     private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
     private static final CsvMapper CSV_MAPPER = new CsvMapper();
@@ -56,20 +49,13 @@ public class CsvFileReader implements 
FileRecordReader<InternalRow> {
     private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
             new ConcurrentHashMap<>(32);
 
-    private final RowType rowType;
     private final CsvOptions options;
-    private final Path filePath;
     private final CsvSchema schema;
-    private final BufferedReader bufferedReader;
-    private final CsvRecordIterator reader;
-
     private boolean headerSkipped = false;
-    private boolean readerClosed = false;
 
     public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType, 
CsvOptions options)
             throws IOException {
-        this.rowType = rowType;
-        this.filePath = filePath;
+        super(fileIO, filePath, rowType);
         this.options = options;
         this.schema =
                 CsvSchema.emptySchema()
@@ -79,76 +65,30 @@ public class CsvFileReader implements 
FileRecordReader<InternalRow> {
         if (!options.includeHeader()) {
             this.schema.withoutHeader();
         }
-        SeekableInputStream inputStream = fileIO.newInputStream(filePath);
-        reader = new CsvRecordIterator();
-        InputStreamReader inputStreamReader =
-                new InputStreamReader(inputStream, StandardCharsets.UTF_8);
-        this.bufferedReader = new BufferedReader(inputStreamReader);
     }
 
     @Override
-    @Nullable
-    public FileRecordIterator<InternalRow> readBatch() throws IOException {
-        if (readerClosed) {
-            return null;
-        }
+    protected BaseTextRecordIterator createRecordIterator() {
+        return new CsvRecordIterator();
+    }
+
+    @Override
+    protected InternalRow parseLine(String line) throws IOException {
+        return parseCsvLine(line, schema);
+    }
 
+    @Override
+    protected void setupReading() throws IOException {
         // Skip header if needed
         if (options.includeHeader() && !headerSkipped) {
             bufferedReader.readLine();
             headerSkipped = true;
         }
-        if (reader.end) {
-            return null;
-        }
-        return reader;
     }
 
-    @Override
-    public void close() throws IOException {
-        if (!readerClosed && bufferedReader != null) {
-            bufferedReader.close();
-            readerClosed = true;
-        }
-    }
-
-    private class CsvRecordIterator implements FileRecordIterator<InternalRow> 
{
-
-        private boolean batchRead = false;
-        private long currentPosition = 0;
-        boolean end = false;
-
-        @Override
-        @Nullable
-        public InternalRow next() throws IOException {
-            if (batchRead || readerClosed) {
-                return null;
-            }
-            String nextLine = bufferedReader.readLine();
-            if (nextLine == null) {
-                batchRead = true;
-                end = true;
-                return null;
-            }
-
-            currentPosition++;
-            return parseCsvLine(nextLine, schema);
-        }
-
-        @Override
-        public void releaseBatch() {
-            // No resources to release for CSV
-        }
-
-        @Override
-        public long returnedPosition() {
-            return currentPosition - 1; // Return position of last returned row
-        }
-
-        @Override
-        public Path filePath() {
-            return filePath;
-        }
+    private class CsvRecordIterator extends BaseTextRecordIterator {
+        // Inherits all functionality from BaseTextRecordIterator
+        // No additional CSV-specific iterator logic needed
     }
 
     protected static String[] parseCsvLineToArray(String line, CsvSchema 
schema)
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
index e6f5fd167d..754bb5a192 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFormatWriter.java
@@ -21,42 +21,32 @@ package org.apache.paimon.format.csv;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.BaseTextFileWriter;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.RowType;
 
-import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** CSV format writer implementation. */
-public class CsvFormatWriter implements FormatWriter {
+public class CsvFormatWriter extends BaseTextFileWriter {
 
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
     // Performance optimization: Cache frequently used cast executors
     private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
             new ConcurrentHashMap<>(32);
 
-    private final RowType rowType;
     private final CsvOptions options;
-
-    private final BufferedWriter writer;
-    private final PositionOutputStream outputStream;
     private boolean headerWritten = false;
-
     private final StringBuilder stringBuilder;
 
     public CsvFormatWriter(PositionOutputStream out, RowType rowType, 
CsvOptions options) {
-        this.rowType = rowType;
+        super(out, rowType);
         this.options = options;
-        this.outputStream = out;
-        this.writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8));
         this.stringBuilder = new StringBuilder();
     }
 
@@ -87,20 +77,6 @@ public class CsvFormatWriter implements FormatWriter {
         writer.write(stringBuilder.toString());
     }
 
-    @Override
-    public void close() throws IOException {
-        writer.flush();
-        writer.close();
-    }
-
-    @Override
-    public boolean reachTargetSize(boolean suggestedCheck, long targetSize) 
throws IOException {
-        if (suggestedCheck) {
-            return outputStream.getPos() >= targetSize;
-        }
-        return false;
-    }
-
     private void writeHeader() throws IOException {
         stringBuilder.setLength(0); // Reuse StringBuilder
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
new file mode 100644
index 0000000000..49790fefc2
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory.FormatContext;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.CloseShieldOutputStream;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** JSON {@link FileFormat}. */
+public class JsonFileFormat extends FileFormat {
+
+    public static final String IDENTIFIER = "json";
+
+    private final Options options;
+
+    public JsonFileFormat(FormatContext context) {
+        super(IDENTIFIER);
+        this.options = getIdentifierPrefixOptions(context.options());
+    }
+
+    @Override
+    public FormatReaderFactory createReaderFactory(
+            RowType projectedRowType, @Nullable List<Predicate> filters) {
+        return new JsonReaderFactory(projectedRowType, new 
JsonOptions(options));
+    }
+
+    @Override
+    public FormatWriterFactory createWriterFactory(RowType type) {
+        return new JsonWriterFactory(type, new JsonOptions(options));
+    }
+
+    @Override
+    public void validateDataFields(RowType rowType) {
+        List<DataType> fieldTypes = rowType.getFieldTypes();
+        for (DataType dataType : fieldTypes) {
+            validateDataType(dataType);
+        }
+    }
+
+    private void validateDataType(DataType dataType) {
+        // JSON format supports all data types since they can be represented 
as JSON values
+        DataTypeRoot typeRoot = dataType.getTypeRoot();
+        switch (typeRoot) {
+            case CHAR:
+            case VARCHAR:
+            case BOOLEAN:
+            case BINARY:
+            case VARBINARY:
+            case DECIMAL:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case ARRAY:
+            case MAP:
+            case ROW:
+                // All types are supported in JSON
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported data type for JSON format: " + dataType);
+        }
+    }
+
+    /** A {@link FormatWriterFactory} to write {@link InternalRow} to JSON. */
+    private static class JsonWriterFactory implements FormatWriterFactory {
+
+        private final RowType rowType;
+        private final JsonOptions options;
+
+        public JsonWriterFactory(RowType rowType, JsonOptions options) {
+            this.rowType = rowType;
+            this.options = options;
+        }
+
+        @Override
+        public FormatWriter create(PositionOutputStream out, String 
compression) {
+            return new JsonFormatWriter(new CloseShieldOutputStream(out), 
rowType, options);
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormatFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormatFactory.java
new file mode 100644
index 0000000000..74dbae14f8
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileFormatFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+
+/** Factory to create {@link JsonFileFormat}. */
+public class JsonFileFormatFactory implements FileFormatFactory {
+
+    public static final String IDENTIFIER = "json";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public FileFormat create(FormatContext formatContext) {
+        return new JsonFileFormat(formatContext);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
new file mode 100644
index 0000000000..96d5ecfa5d
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFileReader.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BaseTextFileReader;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** JSON file reader. */
+public class JsonFileReader extends BaseTextFileReader {
+
+    private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
+
+    private final JsonOptions options;
+
+    public JsonFileReader(FileIO fileIO, Path filePath, RowType rowType, 
JsonOptions options)
+            throws IOException {
+        super(fileIO, filePath, rowType);
+        this.options = options;
+    }
+
+    @Override
+    protected BaseTextRecordIterator createRecordIterator() {
+        return new JsonRecordIterator();
+    }
+
+    @Override
+    protected InternalRow parseLine(String line) throws IOException {
+        try {
+            return convertJsonStringToRow(line, rowType, options);
+        } catch (JsonProcessingException e) {
+            if (options.ignoreParseErrors()) {
+                return null;
+            } else {
+                throw new IOException("Failed to parse JSON line: " + line, e);
+            }
+        } catch (RuntimeException e) {
+            if (options.ignoreParseErrors()) {
+                return null;
+            } else {
+                throw new IOException("Failed to convert JSON line: " + line, 
e);
+            }
+        }
+    }
+
+    private class JsonRecordIterator extends BaseTextRecordIterator {
+        // Inherits all functionality from BaseTextRecordIterator
+        // No additional JSON-specific iterator logic needed
+    }
+
+    private InternalRow convertJsonStringToRow(String line, RowType rowType, 
JsonOptions options)
+            throws JsonProcessingException {
+        JsonNode jsonNode = 
JsonSerdeUtil.OBJECT_MAPPER_INSTANCE.readTree(line);
+        return (InternalRow) convertJsonValue(jsonNode, rowType, options);
+    }
+
+    private Object convertJsonValue(JsonNode node, DataType dataType, 
JsonOptions options) {
+        if (node == null || node.isNull()) {
+            return null;
+        }
+
+        switch (dataType.getTypeRoot()) {
+            case BINARY:
+            case VARBINARY:
+                try {
+                    return BASE64_DECODER.decode(node.asText());
+                } catch (Exception e) {
+                    return handleParseError(e);
+                }
+            case ARRAY:
+                return convertJsonArray(node, (ArrayType) dataType, options);
+            case MAP:
+                return convertJsonMap(node, (MapType) dataType, options);
+            case ROW:
+                return convertJsonRow(node, (RowType) dataType, options);
+            default:
+                return convertPrimitiveStringToType(node.asText(), dataType, 
options);
+        }
+    }
+
+    private GenericArray convertJsonArray(
+            JsonNode arrayNode, ArrayType arrayType, JsonOptions options) {
+        if (!arrayNode.isArray()) {
+            return handleParseError(
+                    new RuntimeException(
+                            "Expected array node but got: " + 
arrayNode.getNodeType()));
+        }
+
+        int size = arrayNode.size();
+        List<Object> elements = new ArrayList<>(size); // Pre-allocate capacity
+        DataType elementType = arrayType.getElementType();
+
+        for (int i = 0; i < size; i++) {
+            Object element;
+            try {
+                element = convertJsonValue(arrayNode.get(i), elementType, 
options);
+                elements.add(element);
+            } catch (Exception e) {
+                Object elementValue = handleParseError(e);
+                elements.add(elementValue);
+            }
+        }
+        return new GenericArray(elements.toArray());
+    }
+
+    private GenericMap convertJsonMap(JsonNode objectNode, MapType mapType, 
JsonOptions options) {
+        if (!objectNode.isObject()) {
+            return handleParseError(
+                    new IllegalArgumentException(
+                            "Expected object node but got: " + 
objectNode.getNodeType()));
+        }
+
+        Map<Object, Object> map = new LinkedHashMap<>(objectNode.size());
+        JsonOptions.MapNullKeyMode mapNullKeyMode = 
options.getMapNullKeyMode();
+        String mapNullKeyLiteral = options.getMapNullKeyLiteral();
+        DataType keyType = mapType.getKeyType();
+        DataType valueType = mapType.getValueType();
+
+        objectNode
+                .fields()
+                .forEachRemaining(
+                        field -> {
+                            try {
+                                String keyStr = field.getKey();
+
+                                // Handle null keys based on the configured 
mode
+                                if (keyStr == null) {
+                                    switch (mapNullKeyMode) {
+                                        case DROP:
+                                            return; // Skip this entry
+                                        case FAIL:
+                                            if (options.ignoreParseErrors()) {
+                                                return; // Skip null keys when 
ignoring errors
+                                            }
+                                            throw new RuntimeException(
+                                                    "Null map key encountered 
and map-null-key-mode is set to FAIL.");
+                                        case LITERAL:
+                                            // Will be handled below in key 
conversion
+                                            break;
+                                        default:
+                                            throw new IllegalStateException(
+                                                    "Unknown MapNullKeyMode: " 
+ mapNullKeyMode);
+                                    }
+                                }
+
+                                // Convert the key
+                                Object key;
+                                if (keyStr == null) {
+                                    // Only LITERAL mode reaches here for null 
keys
+                                    key =
+                                            convertPrimitiveStringToType(
+                                                    mapNullKeyLiteral, 
keyType, options);
+                                } else {
+                                    key = convertPrimitiveStringToType(keyStr, 
keyType, options);
+                                }
+
+                                // Add the entry to the map if key is not null
+                                if (key != null) {
+                                    Object value =
+                                            convertJsonValue(field.getValue(), 
valueType, options);
+                                    map.put(key, value);
+                                }
+                            } catch (Exception e) {
+                                // Use the error handling method for 
consistency
+                                handleParseError(e);
+                            }
+                        });
+        return new GenericMap(map);
+    }
+
+    private Object convertPrimitiveStringToType(
+            String str, DataType dataType, JsonOptions options) {
+        try {
+            switch (dataType.getTypeRoot()) {
+                case TINYINT:
+                    return Byte.parseByte(str);
+                case SMALLINT:
+                    return Short.parseShort(str);
+                case INTEGER:
+                    return Integer.parseInt(str);
+                case BIGINT:
+                    return Long.parseLong(str);
+                case FLOAT:
+                    return Float.parseFloat(str);
+                case DOUBLE:
+                    return Double.parseDouble(str);
+                case BOOLEAN:
+                    return Boolean.parseBoolean(str);
+                case CHAR:
+                case VARCHAR:
+                    return BinaryString.fromString(str);
+                default:
+                    BinaryString binaryString = BinaryString.fromString(str);
+                    CastExecutor cast = 
CastExecutors.resolve(DataTypes.STRING(), dataType);
+                    return cast.cast(binaryString);
+            }
+        } catch (Exception e) {
+            return handleParseError(e);
+        }
+    }
+
+    private GenericRow convertJsonRow(JsonNode objectNode, RowType rowType, 
JsonOptions options) {
+        if (!objectNode.isObject()) {
+            return handleParseError(
+                    new IllegalArgumentException(
+                            "Expected object node but got: " + 
objectNode.getNodeType()));
+        }
+
+        List<DataField> fields = rowType.getFields();
+        int fieldCount = fields.size();
+        Object[] values = new Object[fieldCount];
+
+        for (int i = 0; i < fieldCount; i++) {
+            DataField field = fields.get(i);
+            try {
+                values[i] = convertJsonValue(objectNode.get(field.name()), 
field.type(), options);
+            } catch (Exception e) {
+                values[i] = handleParseError(e);
+            }
+        }
+        return GenericRow.of(values);
+    }
+
+    /**
+     * Handles parse errors based on the ignoreParseErrors option.
+     *
+     * @param exception The exception that occurred
+     * @return null if ignoring errors, otherwise re-throws the exception
+     * @throws RuntimeException if not ignoring errors
+     */
+    private <T> T handleParseError(Exception exception) {
+        if (options.ignoreParseErrors()) {
+            return null;
+        } else {
+            if (exception instanceof RuntimeException) {
+                throw (RuntimeException) exception;
+            } else {
+                throw new RuntimeException(exception);
+            }
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
new file mode 100644
index 0000000000..de6b1a70f9
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonFormatWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.BaseTextFileWriter;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Json format writer implementation. */
+public class JsonFormatWriter extends BaseTextFileWriter {
+
+    private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
+
+    private final char lineDelimiter;
+
+    public JsonFormatWriter(
+            PositionOutputStream outputStream, RowType rowType, JsonOptions 
options) {
+        super(outputStream, rowType);
+        this.lineDelimiter = options.getLineDelimiter().charAt(0);
+    }
+
+    @Override
+    public void addElement(InternalRow element) throws IOException {
+        try {
+            String jsonString = convertRowToJsonString(element, rowType);
+            writer.write(jsonString);
+            writer.write(lineDelimiter);
+        } catch (JsonProcessingException e) {
+            throw new IOException("Failed to convert row to JSON string", e);
+        }
+    }
+
+    private String convertRowToJsonString(InternalRow row, RowType rowType)
+            throws JsonProcessingException {
+        Map<String, Object> result = convertRowToMap(row, rowType);
+        return JsonSerdeUtil.writeValueAsString(result);
+    }
+
+    private Map<String, Object> convertRowToMap(InternalRow row, RowType 
rowType) {
+        List<DataField> fields = rowType.getFields();
+        int fieldCount = fields.size();
+        Map<String, Object> result = new LinkedHashMap<>(fieldCount); // 
Pre-allocate capacity
+
+        for (int i = 0; i < fieldCount; i++) {
+            DataField field = fields.get(i);
+            Object value = InternalRowUtils.get(row, i, field.type());
+            result.put(field.name(), convertRowValue(value, field.type()));
+        }
+        return result;
+    }
+
+    private Object convertRowValue(Object value, DataType dataType) {
+        if (value == null) {
+            return null;
+        }
+
+        switch (dataType.getTypeRoot()) {
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case CHAR:
+            case VARCHAR:
+                return value.toString();
+            case BINARY:
+            case VARBINARY:
+                return BASE64_ENCODER.encodeToString((byte[]) value);
+            case ARRAY:
+                return convertRowArray((InternalArray) value, (ArrayType) 
dataType);
+            case MAP:
+                return convertRowMap((InternalMap) value, (MapType) dataType);
+            case ROW:
+                return convertRowToMap((InternalRow) value, (RowType) 
dataType);
+            default:
+                CastExecutor cast = CastExecutors.resolveToString(dataType);
+                return cast.cast(value).toString();
+        }
+    }
+
+    private List<Object> convertRowArray(InternalArray array, ArrayType 
arrayType) {
+        int size = array.size();
+        List<Object> result = new ArrayList<>(size); // Pre-allocate capacity
+        DataType elementType = arrayType.getElementType();
+
+        for (int i = 0; i < size; i++) {
+            result.add(convertRowValue(InternalRowUtils.get(array, i, 
elementType), elementType));
+        }
+        return result;
+    }
+
+    private Map<String, Object> convertRowMap(InternalMap map, MapType 
mapType) {
+        int size = map.size();
+        Map<String, Object> result = new LinkedHashMap<>(size); // 
Pre-allocate capacity
+        InternalArray keyArray = map.keyArray();
+        InternalArray valueArray = map.valueArray();
+        DataType keyType = mapType.getKeyType();
+        DataType valueType = mapType.getValueType();
+
+        for (int i = 0; i < size; i++) {
+            Object key = InternalRowUtils.get(keyArray, i, keyType);
+            Object value = InternalRowUtils.get(valueArray, i, valueType);
+            result.put(convertToString(key, keyType), convertRowValue(value, 
valueType));
+        }
+        return result;
+    }
+
+    private String convertToString(Object value, DataType dataType) {
+        if (value == null) {
+            return null;
+        }
+
+        DataTypeRoot typeRoot = dataType.getTypeRoot();
+        if (typeRoot == DataTypeRoot.CHAR || typeRoot == DataTypeRoot.VARCHAR) 
{
+            return ((BinaryString) value).toString();
+        }
+        return value.toString();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonOptions.java 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonOptions.java
new file mode 100644
index 0000000000..1b21db3e87
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonOptions.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.options.description.DescribedEnum;
+import org.apache.paimon.options.description.InlineElement;
+
+import static org.apache.paimon.options.description.TextElement.text;
+
+/** Options for Json format. */
+public class JsonOptions {
+
+    public static final ConfigOption<Boolean> JSON_IGNORE_PARSE_ERRORS =
+            ConfigOptions.key("json.ignore-parse-errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to ignore parse errors for JSON 
format");
+
+    public static final ConfigOption<MapNullKeyMode> JSON_MAP_NULL_KEY_MODE =
+            ConfigOptions.key("json.map-null-key-mode")
+                    .enumType(MapNullKeyMode.class)
+                    .defaultValue(MapNullKeyMode.FAIL)
+                    .withDescription("How to handle map keys that are null.");
+
+    public static final ConfigOption<String> JSON_MAP_NULL_KEY_LITERAL =
+            ConfigOptions.key("json.map-null-key-literal")
+                    .stringType()
+                    .defaultValue("null")
+                    .withDescription(
+                            "Literal to use for null map keys when 
map-null-key-mode is LITERAL");
+
+    public static final ConfigOption<String> LINE_DELIMITER =
+            ConfigOptions.key("json.line-delimiter")
+                    .stringType()
+                    .defaultValue("\n")
+                    .withDescription("The line delimiter for JSON format");
+
+    /** Enum for handling null keys in JSON maps. */
+    public enum MapNullKeyMode implements DescribedEnum {
+        FAIL("fail", "Throw an exception when encountering null map keys."),
+        DROP("drop", "Drop entries with null keys from the map."),
+        LITERAL("literal", "Replace null keys with a literal string value.");
+
+        private final String value;
+        private final String description;
+
+        MapNullKeyMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+
+        public String getValue() {
+            return value;
+        }
+    }
+
+    private final boolean ignoreParseErrors;
+    private final MapNullKeyMode mapNullKeyMode;
+    private final String mapNullKeyLiteral;
+    private final String lineDelimiter;
+
+    public JsonOptions(Options options) {
+        this.ignoreParseErrors = options.get(JSON_IGNORE_PARSE_ERRORS);
+        this.mapNullKeyMode = options.get(JSON_MAP_NULL_KEY_MODE);
+        this.mapNullKeyLiteral = options.get(JSON_MAP_NULL_KEY_LITERAL);
+        this.lineDelimiter = options.get(LINE_DELIMITER);
+    }
+
+    public boolean ignoreParseErrors() {
+        return ignoreParseErrors;
+    }
+
+    public MapNullKeyMode getMapNullKeyMode() {
+        return mapNullKeyMode;
+    }
+
+    public String getMapNullKeyLiteral() {
+        return mapNullKeyLiteral;
+    }
+
+    public String getLineDelimiter() {
+        return lineDelimiter;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
new file mode 100644
index 0000000000..b82d77d948
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/json/JsonReaderFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+
+/** Factory to create {@link JsonFileReader}. */
+public class JsonReaderFactory implements FormatReaderFactory {
+
+    private final RowType projectedRowType;
+    private final JsonOptions options;
+
+    public JsonReaderFactory(RowType projectedRowType, JsonOptions options) {
+        this.projectedRowType = projectedRowType;
+        this.options = options;
+    }
+
+    @Override
+    public FileRecordReader<InternalRow> createReader(Context context) throws 
IOException {
+        FileIO fileIO = context.fileIO();
+        Path filePath = context.filePath();
+
+        return new JsonFileReader(fileIO, filePath, projectedRowType, options);
+    }
+}
diff --git 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
index c35f5544ca..d05cf3844f 100644
--- 
a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
+++ 
b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory
@@ -17,3 +17,4 @@ org.apache.paimon.format.avro.AvroFileFormatFactory
 org.apache.paimon.format.orc.OrcFileFormatFactory
 org.apache.paimon.format.parquet.ParquetFileFormatFactory
 org.apache.paimon.format.csv.CsvFileFormatFactory
+org.apache.paimon.format.json.JsonFileFormatFactory
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
new file mode 100644
index 0000000000..dd38623279
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonFileFormatTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.json;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link JsonFileFormat}. */
+public class JsonFileFormatTest extends FormatReadWriteTest {
+
+    protected JsonFileFormatTest() {
+        super("json");
+    }
+
+    @Override
+    protected FileFormat fileFormat() {
+        return new JsonFileFormat(new FileFormatFactory.FormatContext(new 
Options(), 1024, 1024));
+    }
+
+    @Test
+    public void testIgnoreParseErrorsEnabled() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), 
DataTypes.STRING());
+
+        Options options = new Options();
+        options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true);
+
+        FileFormat format =
+                new JsonFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+        Path testFile = new Path(parent, "test_ignore_errors_" + 
UUID.randomUUID() + ".json");
+
+        // Write test data with some malformed JSON lines
+        try (PositionOutputStream out = fileIO.newOutputStream(testFile, 
false)) {
+            String validJson1 = "{\"f0\":1,\"f1\":\"Alice\"}\n";
+            String invalidJson =
+                    "{\"f0\":invalid,\"f1\":\"Bob\"\n"; // Missing closing 
brace and invalid value
+            String validJson2 = "{\"f0\":3,\"f1\":\"Charlie\"}\n";
+            String anotherInvalidJson = "not a json at all\n";
+            String validJson3 = "{\"f0\":4,\"f1\":\"David\"}\n";
+
+            out.write(validJson1.getBytes());
+            out.write(invalidJson.getBytes());
+            out.write(validJson2.getBytes());
+            out.write(anotherInvalidJson.getBytes());
+            out.write(validJson3.getBytes());
+        }
+
+        // Read data - should skip malformed lines and return only valid ones
+        try (RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(
+                                        fileIO, testFile, 
fileIO.getFileSize(testFile)))) {
+
+            InternalRowSerializer serializer = new 
InternalRowSerializer(rowType);
+            List<InternalRow> result = new ArrayList<>();
+            reader.forEachRemaining(
+                    row -> {
+                        if (row != null) { // ignoreParseErrors returns null 
for malformed lines
+                            result.add(serializer.copy(row));
+                        }
+                    });
+
+            // Should only have 3 valid rows (Alice, Charlie, David)
+            assertThat(result).hasSize(3);
+            assertThat(result.get(0).getInt(0)).isEqualTo(1);
+            
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+            assertThat(result.get(1).getInt(0)).isEqualTo(3);
+            
assertThat(result.get(1).getString(1).toString()).isEqualTo("Charlie");
+            assertThat(result.get(2).getInt(0)).isEqualTo(4);
+            
assertThat(result.get(2).getString(1).toString()).isEqualTo("David");
+        }
+    }
+
+    @Test
+    public void testIgnoreParseErrorsDisabled() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), 
DataTypes.STRING());
+
+        Options options = new Options();
+        options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, false); // 
Explicitly disable
+
+        FileFormat format =
+                new JsonFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+        Path testFile = new Path(parent, "test_no_ignore_errors_" + 
UUID.randomUUID() + ".json");
+
+        // Write test data with some malformed JSON lines
+        try (PositionOutputStream out = fileIO.newOutputStream(testFile, 
false)) {
+            String validJson1 = "{\"f0\":1,\"f1\":\"Alice\"}\n";
+            String invalidJson =
+                    "{\"f0\":invalid,\"f1\":\"Bob\"\n"; // Missing closing 
brace and invalid value
+
+            out.write(validJson1.getBytes());
+            out.write(invalidJson.getBytes());
+        }
+
+        // Read data - should throw exception on malformed JSON
+        try (RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(
+                                        fileIO, testFile, 
fileIO.getFileSize(testFile)))) {
+
+            InternalRowSerializer serializer = new 
InternalRowSerializer(rowType);
+            List<InternalRow> result = new ArrayList<>();
+
+            // Should throw IOException when encountering malformed JSON
+            assertThrows(
+                    IOException.class,
+                    () -> {
+                        reader.forEachRemaining(row -> 
result.add(serializer.copy(row)));
+                    });
+
+            // Should have read the first valid row before encountering the 
error
+            assertThat(result).hasSize(1);
+            assertThat(result.get(0).getInt(0)).isEqualTo(1);
+            
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+        }
+    }
+
+    @Test
+    public void testIgnoreParseErrorsWithComplexTypes() throws IOException {
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT().notNull(),
+                        DataTypes.STRING(),
+                        DataTypes.ARRAY(DataTypes.STRING()),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+        Options options = new Options();
+        options.set(JsonOptions.JSON_IGNORE_PARSE_ERRORS, true);
+
+        FileFormat format =
+                new JsonFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+
+        Path testFile =
+                new Path(parent, "test_complex_ignore_errors_" + 
UUID.randomUUID() + ".json");
+
+        // Write test data with some malformed JSON lines
+        try (PositionOutputStream out = fileIO.newOutputStream(testFile, 
false)) {
+            String validJson1 =
+                    
"{\"f0\":1,\"f1\":\"Alice\",\"f2\":[\"a\",\"b\"],\"f3\":{\"key1\":1,\"key2\":2}}\n";
+            String invalidArrayJson =
+                    
"{\"f0\":2,\"f1\":\"Bob\",\"f2\":\"not_an_array\",\"f3\":{\"key1\":1}}\n"; // 
Invalid array
+            String validJson2 =
+                    
"{\"f0\":3,\"f1\":\"Charlie\",\"f2\":[\"c\",\"d\"],\"f3\":{\"key3\":3}}\n";
+            String invalidMapJson =
+                    
"{\"f0\":4,\"f1\":\"David\",\"f2\":[\"e\"],\"f3\":\"not_a_map\"}\n"; // Invalid
+            // map
+
+            out.write(validJson1.getBytes());
+            out.write(invalidArrayJson.getBytes());
+            out.write(validJson2.getBytes());
+            out.write(invalidMapJson.getBytes());
+        }
+
+        // Read data - should handle type conversion errors gracefully
+        try (RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(
+                                        fileIO, testFile, 
fileIO.getFileSize(testFile)))) {
+
+            InternalRowSerializer serializer = new 
InternalRowSerializer(rowType);
+            List<InternalRow> result = new ArrayList<>();
+            reader.forEachRemaining(
+                    row -> {
+                        if (row != null) {
+                            result.add(serializer.copy(row));
+                        }
+                    });
+
+            // Should have valid rows, with null values for failed conversions
+            assertThat(result).hasSize(4);
+
+            // First row should be completely valid
+            assertThat(result.get(0).getInt(0)).isEqualTo(1);
+            
assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+            assertThat(result.get(0).getArray(2)).isNotNull();
+            assertThat(result.get(0).getMap(3)).isNotNull();
+
+            // Second row should have null array due to type mismatch
+            assertThat(result.get(1).getInt(0)).isEqualTo(2);
+            assertThat(result.get(1).getString(1).toString()).isEqualTo("Bob");
+            assertThat(result.get(1).isNullAt(2))
+                    .isTrue(); // Array should be null due to conversion error
+
+            // Third row should be completely valid
+            assertThat(result.get(2).getInt(0)).isEqualTo(3);
+            
assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+
+            // Fourth row should have null map due to type mismatch
+            assertThat(result.get(3).getInt(0)).isEqualTo(4);
+            
assertThat(result.get(3).getString(1).toString()).isEqualTo("David");
+            assertThat(result.get(3).isNullAt(3))
+                    .isTrue(); // Map should be null due to conversion error
+        }
+    }
+
+    @Test
+    public void testMapNullKeyModeFailWithWriteRead() throws IOException {
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT().notNull(),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+        // Test JSON_MAP_NULL_KEY_MODE = FAIL with actual data
+        Options options = new Options();
+        options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE, 
JsonOptions.MapNullKeyMode.FAIL);
+
+        // Create test data with valid maps
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(1, new GenericMap(createTestMap("key1", 
1, "key2", 2))),
+                        GenericRow.of(2, new GenericMap(createTestMap("name", 
100, "value", 200))));
+
+        List<InternalRow> result = writeThenRead(options, rowType, testData, 
"test_fail_mode");
+
+        // Verify results
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).getInt(0)).isEqualTo(1);
+        assertThat(result.get(0).getMap(1).size()).isEqualTo(2);
+        assertThat(result.get(1).getInt(0)).isEqualTo(2);
+        assertThat(result.get(1).getMap(1).size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testMapNullKeyModeDropWithWriteRead() throws IOException {
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT().notNull(),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
+
+        // Test JSON_MAP_NULL_KEY_MODE = DROP with actual data
+        Options options = new Options();
+        options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE, 
JsonOptions.MapNullKeyMode.DROP);
+
+        // Create test data
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(
+                                1, new GenericMap(createTestMap("key1", 1, 
"key2", 2, "key3", 3))));
+
+        List<InternalRow> result = writeThenRead(options, rowType, testData, 
"test_drop_mode");
+
+        // Verify results
+        assertThat(result).hasSize(1);
+        assertThat(result.get(0).getInt(0)).isEqualTo(1);
+        assertThat(result.get(0).getMap(1).size()).isEqualTo(3);
+    }
+
+    @Test
+    public void testDifferentMapNullKeyLiteralsWithWriteRead() throws 
IOException {
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT().notNull(),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+
+        String[] literals = {"EMPTY", "MISSING", "UNDEFINED", "NULL_VALUE"};
+
+        // Create test data once (reused for all literals)
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericMap(
+                                        createTestMap("name", "Alice", "city", 
"New York"))));
+
+        for (String literal : literals) {
+            Options options = new Options();
+            options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE, 
JsonOptions.MapNullKeyMode.LITERAL);
+            options.set(JsonOptions.JSON_MAP_NULL_KEY_LITERAL, literal);
+
+            List<InternalRow> result =
+                    writeThenRead(options, rowType, testData, "test_literal_" 
+ literal);
+
+            // Verify results
+            assertThat(result).hasSize(1);
+            assertThat(result.get(0).getInt(0)).isEqualTo(1);
+            assertThat(result.get(0).getMap(1).size()).isEqualTo(2);
+        }
+    }
+
+    @Test
+    public void testJsonWriteReadWithDifferentLineDelimiters() throws 
IOException {
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.INT().notNull(),
+                        DataTypes.STRING(),
+                        DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()));
+
+        String[] delimiters = {"\n", "\r", "\r\n"};
+
+        // Create test data once (reused for all delimiters)
+        List<InternalRow> testData =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("first"),
+                                new GenericMap(createTestMap("name", "Alice", 
"role", "admin"))),
+                        GenericRow.of(
+                                2,
+                                BinaryString.fromString("second"),
+                                new GenericMap(createTestMap("name", "Bob", 
"role", "user"))));
+
+        for (String delimiter : delimiters) {
+            Options options = new Options();
+            options.set(JsonOptions.LINE_DELIMITER, delimiter);
+            options.set(JsonOptions.JSON_MAP_NULL_KEY_MODE, 
JsonOptions.MapNullKeyMode.LITERAL);
+            options.set(JsonOptions.JSON_MAP_NULL_KEY_LITERAL, "NULL");
+
+            List<InternalRow> result =
+                    writeThenRead(options, rowType, testData, "test_delim_" + 
delimiter.hashCode());
+
+            // Verify results
+            assertThat(result).hasSize(2);
+
+            // Verify first row
+            assertThat(result.get(0).getInt(0)).isEqualTo(1);
+            
assertThat(result.get(0).getString(1).toString()).isEqualTo("first");
+            assertThat(result.get(0).getMap(2).size()).isEqualTo(2);
+
+            // Verify second row
+            assertThat(result.get(1).getInt(0)).isEqualTo(2);
+            
assertThat(result.get(1).getString(1).toString()).isEqualTo("second");
+            assertThat(result.get(1).getMap(2).size()).isEqualTo(2);
+        }
+    }
+
+    @Override
+    public boolean supportDataFileWithoutExtension() {
+        return true;
+    }
+
+    /** Creates a test map with BinaryString keys from String key-value pairs. 
*/
+    private java.util.Map<BinaryString, Object> createTestMap(Object... 
keyValuePairs) {
+        if (keyValuePairs.length % 2 != 0) {
+            throw new IllegalArgumentException("Key-value pairs must be even 
number of arguments");
+        }
+
+        java.util.Map<BinaryString, Object> map = new java.util.HashMap<>();
+        for (int i = 0; i < keyValuePairs.length; i += 2) {
+            String key = (String) keyValuePairs[i];
+            Object value = keyValuePairs[i + 1];
+            if (value instanceof String) {
+                map.put(BinaryString.fromString(key), 
BinaryString.fromString((String) value));
+            } else {
+                map.put(BinaryString.fromString(key), value);
+            }
+        }
+        return map;
+    }
+
+    private List<InternalRow> writeThenRead(
+            Options options, RowType rowType, List<InternalRow> testData, 
String testPrefix)
+            throws IOException {
+        FileFormat format =
+                new JsonFileFormat(new 
FileFormatFactory.FormatContext(options, 1024, 1024));
+        Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID() 
+ ".json");
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        try (PositionOutputStream out = fileIO.newOutputStream(testFile, 
false);
+                FormatWriter writer = writerFactory.create(out, "none")) {
+            for (InternalRow row : testData) {
+                writer.addElement(row);
+            }
+        }
+        try (RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType)
+                        .createReader(
+                                new FormatReaderContext(
+                                        fileIO, testFile, 
fileIO.getFileSize(testFile)))) {
+
+            InternalRowSerializer serializer = new 
InternalRowSerializer(rowType);
+            List<InternalRow> result = new ArrayList<>();
+            reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+            return result;
+        }
+    }
+}

Reply via email to