This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 632a54728 [core] Improve Avro read write performance (#1501)
632a54728 is described below

commit 632a5472877b4803085371f905a2437f89d60845
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 5 20:20:40 2023 +0800

    [core] Improve Avro read write performance (#1501)
---
 .../paimon/benchmark/TableWriterBenchmark.java     |  17 +-
 .../apache/paimon/format/TableStatsCollector.java  |  10 +
 .../apache/paimon/format/FormatReadWriteTest.java  | 179 +++++++
 .../paimon/io/StatsCollectingSingleFileWriter.java |   2 +-
 ...ractAvroBulkFormat.java => AvroBulkFormat.java} | 104 ++---
 .../apache/paimon/format/avro/AvroFileFormat.java  |  65 +--
 .../paimon/format/avro/AvroRowDatumReader.java     |  68 +++
 .../paimon/format/avro/AvroRowDatumWriter.java     |  61 +++
 .../paimon/format/avro/AvroSchemaVisitor.java      | 142 ++++++
 .../org/apache/paimon/format/avro/FieldReader.java |  31 ++
 .../paimon/format/avro/FieldReaderFactory.java     | 514 +++++++++++++++++++++
 .../org/apache/paimon/format/avro/FieldWriter.java |  31 ++
 .../paimon/format/avro/FieldWriterFactory.java     | 226 +++++++++
 .../format/avro/RowDataToAvroConverters.java       | 329 -------------
 .../paimon/format/avro/AvroBulkFormatTest.java     | 160 -------
 .../format/avro/AvroBulkFormatTestUtils.java       |  76 ---
 .../format/avro/AvroFormatReadWriteTest.java       |  32 ++
 .../format/avro/AvroToRowDataConvertersTest.java   |  47 --
 .../format/avro/RowDataToAvroConvertersTest.java   |  57 ---
 19 files changed, 1350 insertions(+), 801 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index 85340f194..2594420ca 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -43,7 +43,22 @@ public class TableWriterBenchmark extends TableBenchmark {
          * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
          * avro:            Best/Avg Time(ms)    Row Rate(M/s)      Per 
Row(ns)   Relative
          * 
---------------------------------------------------------------------------------
-         * avro_write        10139 / 13044              0.0          33797.3   
    1.0X
+         * avro_write        5847 / 7296              0.1          19489.5     
  1.0X
+         */
+    }
+
+    @Test
+    public void testAvroWithoutStats() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        options.set(CoreOptions.METADATA_STATS_MODE, "none");
+        innerTest("avro", options);
+        /*
+         * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
+         * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+         * avro:            Best/Avg Time(ms)    Row Rate(M/s)      Per 
Row(ns)   Relative
+         * 
---------------------------------------------------------------------------------
+         * avro_write        4701 / 5780              0.1          15669.6     
  1.0X
          */
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
index 6f8661815..25f00d743 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/TableStatsCollector.java
@@ -22,9 +22,12 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.statistics.FieldStatsCollector;
+import org.apache.paimon.statistics.NoneFieldStatsCollector;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
+import java.util.Arrays;
+
 import static 
org.apache.paimon.statistics.FieldStatsCollector.createFullStatsFactories;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -34,6 +37,7 @@ public class TableStatsCollector {
     private final RowDataToObjectArrayConverter converter;
     private final FieldStatsCollector[] statsCollectors;
     private final Serializer<Object>[] fieldSerializers;
+    private final boolean isDisabled;
 
     public TableStatsCollector(RowType rowType) {
         this(rowType, createFullStatsFactories(rowType.getFieldCount()));
@@ -52,6 +56,12 @@ public class TableStatsCollector {
         for (int i = 0; i < numFields; i++) {
             fieldSerializers[i] = 
InternalSerializers.create(rowType.getTypeAt(i));
         }
+        this.isDisabled =
+                Arrays.stream(statsCollectors).allMatch(p -> p instanceof 
NoneFieldStatsCollector);
+    }
+
+    public boolean isDisabled() {
+        return isDisabled;
     }
 
     /**
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
new file mode 100644
index 000000000..cd2fd36da
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test Base class for Format. */
+public abstract class FormatReadWriteTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    private FileIO fileIO;
+    private Path file;
+
+    @BeforeEach
+    public void beforeEach() {
+        this.fileIO = LocalFileIO.create();
+        this.file = new Path(new Path(tempPath.toUri()), 
UUID.randomUUID().toString());
+    }
+
+    protected abstract FileFormat fileFormat();
+
+    @Test
+    public void testSimpleTypes() throws IOException {
+        RowType rowType = DataTypes.ROW(DataTypes.INT().notNull(), 
DataTypes.BIGINT());
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            rowType = (RowType) rowType.notNull();
+        }
+
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        FileFormat format = fileFormat();
+
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
null);
+        writer.addElement(GenericRow.of(1, 1L));
+        writer.addElement(GenericRow.of(2, 2L));
+        writer.addElement(GenericRow.of(3, null));
+        writer.flush();
+        writer.finish();
+        out.close();
+
+        RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType).createReader(fileIO, file);
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(row -> result.add(serializer.copy(row)));
+
+        assertThat(result)
+                .containsExactly(
+                        GenericRow.of(1, 1L), GenericRow.of(2, 2L), 
GenericRow.of(3, null));
+    }
+
+    @Test
+    public void testFullTypes() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT().notNull())
+                        .field("name", DataTypes.STRING()) /* optional by 
default */
+                        .field("salary", DataTypes.DOUBLE().notNull())
+                        .field(
+                                "locations",
+                                DataTypes.MAP(
+                                        DataTypes.STRING().notNull(),
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD(
+                                                        0,
+                                                        "posX",
+                                                        
DataTypes.DOUBLE().notNull(),
+                                                        "X field"),
+                                                DataTypes.FIELD(
+                                                        1,
+                                                        "posY",
+                                                        
DataTypes.DOUBLE().notNull(),
+                                                        "Y field"))))
+                        .field("strArray", 
DataTypes.ARRAY(DataTypes.STRING()).nullable())
+                        .field("intArray", 
DataTypes.ARRAY(DataTypes.INT()).nullable())
+                        .field("boolean", DataTypes.BOOLEAN().nullable())
+                        .field("tinyint", DataTypes.TINYINT())
+                        .field("smallint", DataTypes.SMALLINT())
+                        .field("bigint", DataTypes.BIGINT())
+                        .field("bytes", DataTypes.BYTES())
+                        .field("timestamp", DataTypes.TIMESTAMP())
+                        .field("timestamp_3", DataTypes.TIMESTAMP(3))
+                        .field("date", DataTypes.DATE())
+                        .field("decimal", DataTypes.DECIMAL(2, 2))
+                        .field("decimal2", DataTypes.DECIMAL(38, 2))
+                        .field("decimal3", DataTypes.DECIMAL(10, 1))
+                        .build();
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            rowType = (RowType) rowType.notNull();
+        }
+
+        FileFormat format = fileFormat();
+
+        PositionOutputStream out = fileIO.newOutputStream(file, false);
+        FormatWriter writer = format.createWriterFactory(rowType).create(out, 
null);
+        GenericRow expected =
+                GenericRow.of(
+                        1,
+                        fromString("name"),
+                        5.26D,
+                        new GenericMap(
+                                new HashMap<Object, Object>() {
+                                    {
+                                        this.put(fromString("key1"), 
GenericRow.of(5.2D, 6.2D));
+                                        this.put(fromString("key2"), 
GenericRow.of(6.2D, 2.2D));
+                                    }
+                                }),
+                        new GenericArray(new Object[] {fromString("123"), 
fromString("456")}),
+                        new GenericArray(new Object[] {123, 456}),
+                        true,
+                        (byte) 3,
+                        (short) 6,
+                        12304L,
+                        new byte[] {1, 5, 2},
+                        Timestamp.fromMicros(123123123),
+                        Timestamp.fromEpochMillis(123123123),
+                        2456,
+                        Decimal.fromBigDecimal(new BigDecimal(0.22), 2, 2),
+                        Decimal.fromBigDecimal(new BigDecimal(12312455.22), 
38, 2),
+                        Decimal.fromBigDecimal(new BigDecimal(12455.1), 10, 
1));
+        writer.addElement(expected);
+        writer.flush();
+        writer.finish();
+        out.close();
+
+        RecordReader<InternalRow> reader =
+                format.createReaderFactory(rowType).createReader(fileIO, file);
+        List<InternalRow> result = new ArrayList<>();
+        reader.forEachRemaining(result::add);
+
+        assertThat(result).containsExactly(expected);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index 559b0f181..a5de36634 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -68,7 +68,7 @@ public abstract class StatsCollectingSingleFileWriter<T, R> 
extends SingleFileWr
     @Override
     public void write(T record) throws IOException {
         InternalRow rowData = writeImpl(record);
-        if (tableStatsCollector != null) {
+        if (tableStatsCollector != null && !tableStatsCollector.isDisabled()) {
             tableStatsCollector.collect(rowData);
         }
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
 b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
similarity index 56%
rename from 
paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
rename to 
paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 2ce1f7c18..0f5867e64 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AbstractAvroBulkFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -23,95 +23,62 @@ import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.IteratorResultIterator;
 import org.apache.paimon.utils.Pool;
 
-import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.DatumReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.function.Function;
 
 /** Provides a {@link FormatReaderFactory} for Avro records. */
-public abstract class AbstractAvroBulkFormat<A> implements FormatReaderFactory 
{
+public class AvroBulkFormat implements FormatReaderFactory {
 
     private static final long serialVersionUID = 1L;
 
-    protected final Schema readerSchema;
+    protected final RowType rowType;
+    private final int[] projection;
 
-    protected AbstractAvroBulkFormat(Schema readerSchema) {
-        this.readerSchema = readerSchema;
+    public AvroBulkFormat(RowType rowType, int[] projection) {
+        this.rowType = rowType;
+        this.projection = projection;
     }
 
     @Override
     public AvroReader createReader(FileIO fileIO, Path file) throws 
IOException {
-        return createReader(fileIO, file, createReusedAvroRecord(), 
createConverter());
+        return new AvroReader(fileIO, file);
     }
 
-    private AvroReader createReader(
-            FileIO fileIO, Path file, A reuse, Function<A, InternalRow> 
converter)
-            throws IOException {
-        return new AvroReader(fileIO, file, 0, fileIO.getFileSize(file), -1, 
0, reuse, converter);
-    }
-
-    protected abstract A createReusedAvroRecord();
-
-    protected abstract Function<A, InternalRow> createConverter();
-
     private class AvroReader implements RecordReader<InternalRow> {
 
         private final FileIO fileIO;
-        private final DataFileReader<A> reader;
-        private final Function<A, InternalRow> converter;
+        private final DataFileReader<InternalRow> reader;
 
         private final long end;
-        private final Pool<A> pool;
-
-        private long currentRecordsToSkip;
-
-        private AvroReader(
-                FileIO fileIO,
-                Path path,
-                long offset,
-                long end,
-                long blockStart,
-                long recordsToSkip,
-                A reuse,
-                Function<A, InternalRow> converter)
-                throws IOException {
+        private final Pool<Object> pool;
+
+        private AvroReader(FileIO fileIO, Path path) throws IOException {
             this.fileIO = fileIO;
             this.reader = createReaderFromPath(path);
-            if (blockStart >= 0) {
-                reader.seek(blockStart);
-            } else {
-                reader.sync(offset);
-            }
-            for (int i = 0; i < recordsToSkip; i++) {
-                reader.next(reuse);
-            }
-            this.converter = converter;
-
-            this.end = end;
+            this.reader.sync(0);
+            this.end = fileIO.getFileSize(path);
             this.pool = new Pool<>(1);
-            this.pool.add(reuse);
-
-            this.currentRecordsToSkip = recordsToSkip;
+            this.pool.add(new Object());
         }
 
-        private DataFileReader<A> createReaderFromPath(Path path) throws 
IOException {
-            DatumReader<A> datumReader = new GenericDatumReader<>(null, 
readerSchema);
+        private DataFileReader<InternalRow> createReaderFromPath(Path path) 
throws IOException {
+            DatumReader<InternalRow> datumReader = new 
AvroRowDatumReader(rowType, projection);
             SeekableInput in =
                     new SeekableInputStreamWrapper(
                             fileIO.newInputStream(path), 
fileIO.getFileSize(path));
             try {
-                return (DataFileReader<A>) DataFileReader.openReader(in, 
datumReader);
+                return (DataFileReader<InternalRow>) 
DataFileReader.openReader(in, datumReader);
             } catch (Throwable e) {
                 IOUtils.closeQuietly(in);
                 throw e;
@@ -121,9 +88,9 @@ public abstract class AbstractAvroBulkFormat<A> implements 
FormatReaderFactory {
         @Nullable
         @Override
         public RecordIterator<InternalRow> readBatch() throws IOException {
-            A reuse;
+            Object ticket;
             try {
-                reuse = pool.pollEntry();
+                ticket = pool.pollEntry();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new RuntimeException(
@@ -131,18 +98,12 @@ public abstract class AbstractAvroBulkFormat<A> implements 
FormatReaderFactory {
             }
 
             if (!readNextBlock()) {
-                pool.recycler().recycle(reuse);
+                pool.recycler().recycle(ticket);
                 return null;
             }
 
-            Iterator<InternalRow> iterator =
-                    new AvroBlockIterator(
-                            reader.getBlockCount() - currentRecordsToSkip,
-                            reader,
-                            reuse,
-                            converter);
-            currentRecordsToSkip = 0;
-            return new IteratorResultIterator<>(iterator, () -> 
pool.recycler().recycle(reuse));
+            Iterator<InternalRow> iterator = new 
AvroBlockIterator(reader.getBlockCount(), reader);
+            return new IteratorResultIterator<>(iterator, () -> 
pool.recycler().recycle(ticket));
         }
 
         private boolean readNextBlock() throws IOException {
@@ -157,22 +118,14 @@ public abstract class AbstractAvroBulkFormat<A> 
implements FormatReaderFactory {
         }
     }
 
-    private class AvroBlockIterator implements Iterator<InternalRow> {
+    private static class AvroBlockIterator implements Iterator<InternalRow> {
 
         private long numRecordsRemaining;
-        private final DataFileReader<A> reader;
-        private final A reuse;
-        private final Function<A, InternalRow> converter;
-
-        private AvroBlockIterator(
-                long numRecordsRemaining,
-                DataFileReader<A> reader,
-                A reuse,
-                Function<A, InternalRow> converter) {
+        private final DataFileReader<InternalRow> reader;
+
+        private AvroBlockIterator(long numRecordsRemaining, 
DataFileReader<InternalRow> reader) {
             this.numRecordsRemaining = numRecordsRemaining;
             this.reader = reader;
-            this.reuse = reuse;
-            this.converter = converter;
         }
 
         @Override
@@ -186,7 +139,8 @@ public abstract class AbstractAvroBulkFormat<A> implements 
FormatReaderFactory {
                 numRecordsRemaining--;
                 // reader.next merely deserialize bytes in memory to java 
objects
                 // and will not read from file
-                return converter.apply(reader.next(reuse));
+                // Do not reuse object, manifest file assumes no object reuse
+                return reader.next(null);
             } catch (IOException e) {
                 throw new RuntimeException(
                         "Encountered exception when reading from avro format 
file", e);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
index 11d4d3713..fffbaec9b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.format.avro;
 
-import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -36,16 +35,11 @@ import org.apache.paimon.utils.Projection;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.function.Function;
 
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 
@@ -70,17 +64,12 @@ public class AvroFileFormat extends FileFormat {
     @Override
     public FormatReaderFactory createReaderFactory(
             RowType type, int[][] projection, @Nullable List<Predicate> 
filters) {
-        // avro is a file format that keeps schemas in file headers,
-        // if the schema given to the reader is not equal to the schema in 
header,
-        // reader will automatically map the fields and give back records with 
our desired
-        // schema
-        DataType producedType = Projection.of(projection).project(type);
-        return new AvroGenericRecordBulkFormat((RowType) 
producedType.copy(false));
+        return new AvroBulkFormat(type, 
Projection.of(projection).toTopLevelIndexes());
     }
 
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
-        return new RowDataAvroWriterFactory(type, 
formatOptions.get(AVRO_OUTPUT_CODEC));
+        return new RowAvroWriterFactory(type, 
formatOptions.get(AVRO_OUTPUT_CODEC));
     }
 
     @Override
@@ -91,48 +80,18 @@ public class AvroFileFormat extends FileFormat {
         }
     }
 
-    private static class AvroGenericRecordBulkFormat extends 
AbstractAvroBulkFormat<GenericRecord> {
+    /** A {@link FormatWriterFactory} to write {@link InternalRow}. */
+    private static class RowAvroWriterFactory implements FormatWriterFactory {
 
-        private static final long serialVersionUID = 1L;
+        private final AvroWriterFactory<InternalRow> factory;
 
-        private final RowType producedRowType;
-
-        public AvroGenericRecordBulkFormat(RowType producedRowType) {
-            super(AvroSchemaConverter.convertToSchema(producedRowType));
-            this.producedRowType = producedRowType;
-        }
-
-        @Override
-        protected GenericRecord createReusedAvroRecord() {
-            return new GenericData.Record(readerSchema);
-        }
-
-        @Override
-        protected Function<GenericRecord, InternalRow> createConverter() {
-            AvroToRowDataConverters.AvroToRowDataConverter converter =
-                    
AvroToRowDataConverters.createRowConverter(producedRowType);
-            return record -> record == null ? null : (GenericRow) 
converter.convert(record);
-        }
-    }
-
-    /**
-     * A {@link FormatWriterFactory} to convert {@link InternalRow} to {@link 
GenericRecord} and
-     * wrap {@link AvroWriterFactory}.
-     */
-    private static class RowDataAvroWriterFactory implements 
FormatWriterFactory {
-
-        private final AvroWriterFactory<GenericRecord> factory;
-        private final RowType rowType;
-
-        private RowDataAvroWriterFactory(RowType rowType, String codec) {
-            this.rowType = rowType;
+        private RowAvroWriterFactory(RowType rowType, String codec) {
             this.factory =
                     new AvroWriterFactory<>(
                             out -> {
                                 Schema schema = 
AvroSchemaConverter.convertToSchema(rowType);
-                                DatumWriter<GenericRecord> datumWriter =
-                                        new GenericDatumWriter<>(schema);
-                                DataFileWriter<GenericRecord> dataFileWriter =
+                                AvroRowDatumWriter datumWriter = new 
AvroRowDatumWriter(rowType);
+                                DataFileWriter<InternalRow> dataFileWriter =
                                         new DataFileWriter<>(datumWriter);
 
                                 if (codec != null) {
@@ -146,16 +105,12 @@ public class AvroFileFormat extends FileFormat {
         @Override
         public FormatWriter create(PositionOutputStream out, String 
compression)
                 throws IOException {
-            AvroBulkWriter<GenericRecord> writer = factory.create(out);
-            RowDataToAvroConverters.RowDataToAvroConverter converter =
-                    RowDataToAvroConverters.createConverter(rowType);
-            Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+            AvroBulkWriter<InternalRow> writer = factory.create(out);
             return new FormatWriter() {
 
                 @Override
                 public void addElement(InternalRow element) throws IOException 
{
-                    GenericRecord record = (GenericRecord) 
converter.convert(schema, element);
-                    writer.addElement(record);
+                    writer.addElement(element);
                 }
 
                 @Override
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
new file mode 100644
index 000000000..db5cc8b12
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.avro.FieldReaderFactory.RowReader;
+import org.apache.paimon.types.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+
+import java.io.IOException;
+
+/** A {@link DatumReader} for reading {@link InternalRow}. */
+public class AvroRowDatumReader implements DatumReader<InternalRow> {
+
+    private final RowType rowType;
+    private final int[] projection;
+
+    private RowReader reader;
+    private boolean isUnion;
+
+    public AvroRowDatumReader(RowType rowType, int[] projection) {
+        this.rowType = rowType;
+        this.projection = projection;
+    }
+
+    @Override
+    public void setSchema(Schema schema) {
+        this.isUnion = false;
+        if (schema.isUnion()) {
+            this.isUnion = true;
+            schema = schema.getTypes().get(1);
+        }
+        this.reader =
+                new FieldReaderFactory()
+                        .createRowReader(schema, rowType.getFieldTypes(), 
projection);
+    }
+
+    @Override
+    public InternalRow read(InternalRow reuse, Decoder in) throws IOException {
+        if (isUnion) {
+            int index = in.readIndex();
+            if (index == 0) {
+                throw new RuntimeException("Cannot read a null row.");
+            }
+        }
+
+        return reader.read(in, reuse);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
new file mode 100644
index 000000000..ef6643bd1
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.avro.FieldWriterFactory.RowWriter;
+import org.apache.paimon.types.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+
+import java.io.IOException;
+
+/** A {@link DatumWriter} for writing {@link InternalRow}. */
+public class AvroRowDatumWriter implements DatumWriter<InternalRow> {
+
+    private final RowType rowType;
+
+    private RowWriter writer;
+    private boolean isUnion;
+
+    public AvroRowDatumWriter(RowType rowType) {
+        this.rowType = rowType;
+    }
+
+    @Override
+    public void setSchema(Schema schema) {
+        this.isUnion = false;
+        if (schema.isUnion()) {
+            this.isUnion = true;
+            schema = schema.getTypes().get(1);
+        }
+        this.writer = new FieldWriterFactory().createRowWriter(schema, 
rowType.getFieldTypes());
+    }
+
+    @Override
+    public void write(InternalRow datum, Encoder out) throws IOException {
+        if (isUnion) {
+            // top Row is a UNION type
+            out.writeIndex(1);
+        }
+        this.writer.writeRow(datum, out);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java
new file mode 100644
index 000000000..e30800ab9
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaVisitor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.util.List;
+
+import static org.apache.paimon.types.DataTypeChecks.getPrecision;
+
+/** Visitor to visit {@link Schema}. */
+public interface AvroSchemaVisitor<T> {
+
+    default T visit(Schema schema, DataType type) {
+        switch (schema.getType()) {
+            case RECORD:
+                return visitRecord(schema, ((RowType) type).getFieldTypes());
+
+            case UNION:
+                return visitUnion(schema, type);
+
+            case ARRAY:
+                return visitArray(schema, ((ArrayType) type).getElementType());
+
+            case MAP:
+                DataType valueType = DataTypes.INT();
+                if (type instanceof MapType) {
+                    valueType = ((MapType) type).getValueType();
+                }
+                return visitMap(schema, valueType);
+
+            default:
+                return primitive(schema, type);
+        }
+    }
+
+    default T primitive(Schema primitive, DataType type) {
+        LogicalType logicalType = primitive.getLogicalType();
+        if (logicalType != null) {
+            switch (logicalType.getName()) {
+                case "date":
+                case "time-millis":
+                    return visitInt();
+
+                case "timestamp-millis":
+                    return visitTimestampMillis(getPrecision(type));
+
+                case "timestamp-micros":
+                    return visitTimestampMicros(getPrecision(type));
+
+                case "decimal":
+                    LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
logicalType;
+                    return visitDecimal(decimal.getPrecision(), 
decimal.getScale());
+
+                default:
+                    throw new IllegalArgumentException("Unknown logical type: 
" + logicalType);
+            }
+        }
+
+        switch (primitive.getType()) {
+            case BOOLEAN:
+                return visitBoolean();
+            case INT:
+                switch (type.getTypeRoot()) {
+                    case TINYINT:
+                        return visitTinyInt();
+                    case SMALLINT:
+                        return visitSmallInt();
+                    default:
+                        return visitInt();
+                }
+            case LONG:
+                return visitBigInt();
+            case FLOAT:
+                return visitFloat();
+            case DOUBLE:
+                return visitDouble();
+            case STRING:
+                return visitString();
+            case BYTES:
+                return visitBytes();
+            default:
+                throw new IllegalArgumentException("Unsupported type: " + 
primitive);
+        }
+    }
+
+    T visitUnion(Schema schema, DataType type);
+
+    T visitString();
+
+    T visitBytes();
+
+    T visitInt();
+
+    T visitTinyInt();
+
+    T visitSmallInt();
+
+    T visitBoolean();
+
+    T visitBigInt();
+
+    T visitFloat();
+
+    T visitDouble();
+
+    T visitTimestampMillis(int precision);
+
+    T visitTimestampMicros(int precision);
+
+    T visitDecimal(int precision, int scale);
+
+    T visitArray(Schema schema, DataType elementType);
+
+    T visitMap(Schema schema, DataType valueType);
+
+    T visitRecord(Schema schema, List<DataType> fieldTypes);
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReader.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReader.java
new file mode 100644
index 000000000..37402b835
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.avro.io.Decoder;
+
+import java.io.IOException;
+
+/** Reader to read field from Avro {@link Decoder}. */
+public interface FieldReader {
+
+    Object read(Decoder decoder, Object reuse) throws IOException;
+
+    void skip(Decoder decoder) throws IOException;
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
new file mode 100644
index 000000000..dfee0d8da
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.DataType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+/** Factory to create {@link FieldReader}. */
+public class FieldReaderFactory implements AvroSchemaVisitor<FieldReader> {
+
+    private static final FieldReader STRING_READER = new StringReader();
+
+    private static final FieldReader BYTES_READER = new BytesReader();
+
+    private static final FieldReader BOOLEAN_READER = new BooleanReader();
+
+    private static final FieldReader TINYINT_READER = new TinyIntReader();
+
+    private static final FieldReader SMALLINT_READER = new SmallIntReader();
+
+    private static final FieldReader INT_READER = new IntReader();
+
+    private static final FieldReader BIGINT_READER = new BigIntReader();
+
+    private static final FieldReader FLOAT_READER = new FloatReader();
+
+    private static final FieldReader DOUBLE_READER = new DoubleReader();
+
+    private static final FieldReader TIMESTAMP_MILLS_READER = new 
TimestampMillsReader();
+
+    private static final FieldReader TIMESTAMP_MICROS_READER = new 
TimestampMicrosReader();
+
+    @Override
+    public FieldReader visitUnion(Schema schema, DataType type) {
+        return new NullableReader(visit(schema.getTypes().get(1), type));
+    }
+
+    @Override
+    public FieldReader visitString() {
+        return STRING_READER;
+    }
+
+    @Override
+    public FieldReader visitBytes() {
+        return BYTES_READER;
+    }
+
+    @Override
+    public FieldReader visitInt() {
+        return INT_READER;
+    }
+
+    @Override
+    public FieldReader visitTinyInt() {
+        return TINYINT_READER;
+    }
+
+    @Override
+    public FieldReader visitSmallInt() {
+        return SMALLINT_READER;
+    }
+
+    @Override
+    public FieldReader visitBoolean() {
+        return BOOLEAN_READER;
+    }
+
+    @Override
+    public FieldReader visitBigInt() {
+        return BIGINT_READER;
+    }
+
+    @Override
+    public FieldReader visitFloat() {
+        return FLOAT_READER;
+    }
+
+    @Override
+    public FieldReader visitDouble() {
+        return DOUBLE_READER;
+    }
+
+    @Override
+    public FieldReader visitTimestampMillis(int precision) {
+        return TIMESTAMP_MILLS_READER;
+    }
+
+    @Override
+    public FieldReader visitTimestampMicros(int precision) {
+        return TIMESTAMP_MICROS_READER;
+    }
+
+    @Override
+    public FieldReader visitDecimal(int precision, int scale) {
+        return new DecimalReader(precision, scale);
+    }
+
+    @Override
+    public FieldReader visitArray(Schema schema, DataType elementType) {
+        FieldReader elementReader = visit(schema.getElementType(), 
elementType);
+        return new ArrayReader(elementReader);
+    }
+
+    @Override
+    public FieldReader visitMap(Schema schema, DataType valueType) {
+        FieldReader valueReader = visit(schema.getValueType(), valueType);
+        return new MapReader(valueReader);
+    }
+
+    @Override
+    public FieldReader visitRecord(Schema schema, List<DataType> fieldTypes) {
+        return new RowReader(schema, fieldTypes);
+    }
+
+    private static class NullableReader implements FieldReader {
+
+        private final FieldReader reader;
+
+        public NullableReader(FieldReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            int index = decoder.readIndex();
+            return index == 0 ? null : reader.read(decoder, reuse);
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            int index = decoder.readIndex();
+            if (index == 1) {
+                reader.skip(decoder);
+            }
+        }
+    }
+
+    private static class StringReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            Utf8 utf8 = null;
+            if (reuse instanceof BinaryString) {
+                utf8 = new Utf8(((BinaryString) reuse).toBytes());
+            }
+
+            Utf8 string = decoder.readString(utf8);
+            return BinaryString.fromBytes(string.getBytes(), 0, 
string.getByteLength());
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.skipString();
+        }
+    }
+
+    private static class BytesReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return decoder.readBytes(null).array();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.skipBytes();
+        }
+    }
+
+    private static class BooleanReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return decoder.readBoolean();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readBoolean();
+        }
+    }
+
+    private static class TinyIntReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return (byte) decoder.readInt();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readInt();
+        }
+    }
+
+    private static class SmallIntReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return (short) decoder.readInt();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readInt();
+        }
+    }
+
+    private static class IntReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return decoder.readInt();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readInt();
+        }
+    }
+
+    private static class BigIntReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return decoder.readLong();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readLong();
+        }
+    }
+
+    private static class FloatReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return decoder.readFloat();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readFloat();
+        }
+    }
+
+    private static class DoubleReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return decoder.readDouble();
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readDouble();
+        }
+    }
+
+    private static class DecimalReader implements FieldReader {
+
+        private final int precision;
+        private final int scale;
+
+        private DecimalReader(int precision, int scale) {
+            this.precision = precision;
+            this.scale = scale;
+        }
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            byte[] bytes = (byte[]) BYTES_READER.read(decoder, null);
+            return Decimal.fromBigDecimal(
+                    new BigDecimal(new BigInteger(bytes), scale), precision, 
scale);
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            BYTES_READER.skip(decoder);
+        }
+    }
+
+    private static class TimestampMillsReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return Timestamp.fromEpochMillis(decoder.readLong());
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readLong();
+        }
+    }
+
+    private static class TimestampMicrosReader implements FieldReader {
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            return Timestamp.fromMicros(decoder.readLong());
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            decoder.readLong();
+        }
+    }
+
+    private static class ArrayReader implements FieldReader {
+
+        private final FieldReader elementReader;
+        private final List<Object> reusedList = new ArrayList<>();
+
+        private ArrayReader(FieldReader elementReader) {
+            this.elementReader = elementReader;
+        }
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            reusedList.clear();
+            long chunkLength = decoder.readArrayStart();
+
+            while (chunkLength > 0) {
+                for (int i = 0; i < chunkLength; i += 1) {
+                    reusedList.add(elementReader.read(decoder, null));
+                }
+
+                chunkLength = decoder.arrayNext();
+            }
+
+            return new GenericArray(reusedList.toArray());
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            long chunkLength = decoder.readArrayStart();
+
+            while (chunkLength > 0) {
+                for (int i = 0; i < chunkLength; i += 1) {
+                    elementReader.skip(decoder);
+                }
+
+                chunkLength = decoder.arrayNext();
+            }
+        }
+    }
+
+    private static class MapReader implements FieldReader {
+
+        private final FieldReader valueReader;
+        private final List<Object> reusedKeyList = new ArrayList<>();
+        private final List<Object> reusedValueList = new ArrayList<>();
+
+        private MapReader(FieldReader valueReader) {
+            this.valueReader = valueReader;
+        }
+
+        @Override
+        public Object read(Decoder decoder, Object reuse) throws IOException {
+            reusedKeyList.clear();
+            reusedValueList.clear();
+
+            long chunkLength = decoder.readMapStart();
+
+            while (chunkLength > 0) {
+                for (int i = 0; i < chunkLength; i += 1) {
+                    reusedKeyList.add(STRING_READER.read(decoder, null));
+                    reusedValueList.add(valueReader.read(decoder, null));
+                }
+
+                chunkLength = decoder.mapNext();
+            }
+
+            Map<Object, Object> map = new HashMap<>();
+            Object[] keys = reusedKeyList.toArray();
+            Object[] values = reusedValueList.toArray();
+            for (int i = 0; i < keys.length; i++) {
+                map.put(keys[i], values[i]);
+            }
+
+            return new GenericMap(map);
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            long chunkLength = decoder.readMapStart();
+
+            while (chunkLength > 0) {
+                for (int i = 0; i < chunkLength; i += 1) {
+                    STRING_READER.skip(decoder);
+                    valueReader.skip(decoder);
+                }
+
+                chunkLength = decoder.mapNext();
+            }
+        }
+    }
+
+    public RowReader createRowReader(Schema schema, List<DataType> fieldTypes, 
int[] projection) {
+        return new RowReader(schema, fieldTypes, projection);
+    }
+
+    /** A {@link FieldReader} to read {@link InternalRow}. */
+    public class RowReader implements FieldReader {
+
+        private final FieldReader[] fieldReaders;
+        private final int[] projection;
+        private final int[][] mapping;
+
+        public RowReader(Schema schema, List<DataType> fieldTypes) {
+            this(schema, fieldTypes, IntStream.range(0, 
fieldTypes.size()).toArray());
+        }
+
+        public RowReader(Schema schema, List<DataType> fieldTypes, int[] 
projection) {
+            List<Schema.Field> schemaFields = schema.getFields();
+            this.fieldReaders = new FieldReader[schemaFields.size()];
+            for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize; 
i++) {
+                Schema.Field field = schemaFields.get(i);
+                DataType type = fieldTypes.get(i);
+                fieldReaders[i] = visit(field.schema(), type);
+            }
+            this.projection = projection;
+
+            // use fieldTypes to compatible with less fields in avro
+
+            @SuppressWarnings("unchecked")
+            List<Integer>[] mapping = new List[fieldTypes.size()];
+            for (int i = 0; i < projection.length; i++) {
+                List<Integer> columns = mapping[projection[i]];
+                if (columns == null) {
+                    columns = new ArrayList<>();
+                    mapping[projection[i]] = columns;
+                }
+                columns.add(i);
+            }
+
+            this.mapping = new int[fieldTypes.size()][];
+            for (int i = 0; i < mapping.length; i++) {
+                List<Integer> fields = mapping[i];
+                if (fields != null) {
+                    this.mapping[i] = 
fields.stream().mapToInt(Integer::intValue).toArray();
+                }
+            }
+        }
+
+        @Override
+        public InternalRow read(Decoder decoder, Object reuse) throws 
IOException {
+            GenericRow row;
+            if (reuse instanceof GenericRow
+                    && ((GenericRow) reuse).getFieldCount() == 
projection.length) {
+                row = (GenericRow) reuse;
+            } else {
+                row = new GenericRow(projection.length);
+            }
+
+            for (int i = 0; i < fieldReaders.length; i += 1) {
+                int[] columns = mapping[i];
+                FieldReader reader = fieldReaders[i];
+                if (columns == null) {
+                    reader.skip(decoder);
+                } else {
+                    Object value = reader.read(decoder, 
row.getField(columns[0]));
+                    for (int column : columns) {
+                        row.setField(column, value);
+                    }
+                }
+            }
+            return row;
+        }
+
+        @Override
+        public void skip(Decoder decoder) throws IOException {
+            for (FieldReader fieldReader : fieldReaders) {
+                fieldReader.skip(decoder);
+            }
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriter.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriter.java
new file mode 100644
index 000000000..a790fcd94
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.DataGetters;
+
+import org.apache.avro.io.Encoder;
+
+import java.io.IOException;
+
+/** Writer to write field to Avro {@link Encoder}. */
+public interface FieldWriter {
+
+    void write(DataGetters container, int index, Encoder encoder) throws 
IOException;
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
new file mode 100644
index 000000000..0bc37894b
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Factory to create {@link FieldWriter}. */
+public class FieldWriterFactory implements AvroSchemaVisitor<FieldWriter> {
+
+    private static final FieldWriter STRING_WRITER =
+            (container, i, encoder) ->
+                    encoder.writeString(new 
Utf8(container.getString(i).toBytes()));
+
+    private static final FieldWriter BYTES_WRITER =
+            (container, i, encoder) -> 
encoder.writeBytes(container.getBinary(i));
+
+    private static final FieldWriter BOOLEAN_WRITER =
+            (container, i, encoder) -> 
encoder.writeBoolean(container.getBoolean(i));
+
+    private static final FieldWriter INT_WRITER =
+            (container, i, encoder) -> encoder.writeInt(container.getInt(i));
+
+    private static final FieldWriter TINYINT_WRITER =
+            (container, i, encoder) -> encoder.writeInt(container.getByte(i));
+
+    private static final FieldWriter SMALLINT_WRITER =
+            (container, i, encoder) -> encoder.writeInt(container.getShort(i));
+
+    private static final FieldWriter BIGINT_WRITER =
+            (container, i, encoder) -> encoder.writeLong(container.getLong(i));
+
+    private static final FieldWriter FLOAT_WRITER =
+            (container, i, encoder) -> 
encoder.writeFloat(container.getFloat(i));
+
+    private static final FieldWriter DOUBLE_WRITER =
+            (container, i, encoder) -> 
encoder.writeDouble(container.getDouble(i));
+
+    @Override
+    public FieldWriter visitUnion(Schema schema, DataType type) {
+        return new NullableWriter(visit(schema.getTypes().get(1), type));
+    }
+
+    @Override
+    public FieldWriter visitString() {
+        return STRING_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitBytes() {
+        return BYTES_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitInt() {
+        return INT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitTinyInt() {
+        return TINYINT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitSmallInt() {
+        return SMALLINT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitBoolean() {
+        return BOOLEAN_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitBigInt() {
+        return BIGINT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitFloat() {
+        return FLOAT_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitDouble() {
+        return DOUBLE_WRITER;
+    }
+
+    @Override
+    public FieldWriter visitTimestampMillis(int precision) {
+        return (container, i, encoder) ->
+                encoder.writeLong(container.getTimestamp(i, 
precision).getMillisecond());
+    }
+
+    @Override
+    public FieldWriter visitTimestampMicros(int precision) {
+        return (container, i, encoder) ->
+                encoder.writeLong(container.getTimestamp(i, 
precision).toMicros());
+    }
+
+    @Override
+    public FieldWriter visitDecimal(int precision, int scale) {
+        return (container, index, encoder) -> {
+            Decimal decimal = container.getDecimal(index, precision, scale);
+            encoder.writeBytes(decimal.toUnscaledBytes());
+        };
+    }
+
+    @Override
+    public FieldWriter visitArray(Schema schema, DataType elementType) {
+        FieldWriter elementWriter = visit(schema.getElementType(), 
elementType);
+        return (container, index, encoder) -> {
+            InternalArray array = container.getArray(index);
+            encoder.writeArrayStart();
+            int numElements = array.size();
+            encoder.setItemCount(numElements);
+            for (int i = 0; i < numElements; i += 1) {
+                encoder.startItem();
+                elementWriter.write(array, i, encoder);
+            }
+            encoder.writeArrayEnd();
+        };
+    }
+
+    @Override
+    public FieldWriter visitMap(Schema schema, DataType valueType) {
+        FieldWriter valueWriter = visit(schema.getValueType(), valueType);
+        return (container, index, encoder) -> {
+            InternalMap map = container.getMap(index);
+            encoder.writeMapStart();
+            int numElements = map.size();
+            encoder.setItemCount(numElements);
+            InternalArray keyArray = map.keyArray();
+            InternalArray valueArray = map.valueArray();
+            for (int i = 0; i < numElements; i += 1) {
+                encoder.startItem();
+                STRING_WRITER.write(keyArray, i, encoder);
+                valueWriter.write(valueArray, i, encoder);
+            }
+            encoder.writeMapEnd();
+        };
+    }
+
+    @Override
+    public FieldWriter visitRecord(Schema schema, List<DataType> fieldTypes) {
+        return new RowWriter(schema, fieldTypes);
+    }
+
+    private static class NullableWriter implements FieldWriter {
+
+        private final FieldWriter writer;
+
+        public NullableWriter(FieldWriter writer) {
+            this.writer = writer;
+        }
+
+        @Override
+        public void write(DataGetters container, int index, Encoder encoder) 
throws IOException {
+            if (container.isNullAt(index)) {
+                encoder.writeIndex(0);
+            } else {
+                encoder.writeIndex(1);
+                writer.write(container, index, encoder);
+            }
+        }
+    }
+
+    /** A {@link FieldWriter} to write {@link InternalRow}. */
+    public class RowWriter implements FieldWriter {
+
+        private final FieldWriter[] fieldWriters;
+
+        private RowWriter(Schema schema, List<DataType> fieldTypes) {
+            List<Schema.Field> schemaFields = schema.getFields();
+            this.fieldWriters = new FieldWriter[schemaFields.size()];
+            for (int i = 0, fieldsSize = schemaFields.size(); i < fieldsSize; 
i++) {
+                Schema.Field field = schemaFields.get(i);
+                DataType type = fieldTypes.get(i);
+                fieldWriters[i] = visit(field.schema(), type);
+            }
+        }
+
+        @Override
+        public void write(DataGetters container, int index, Encoder encoder) 
throws IOException {
+            InternalRow row = container.getRow(index, fieldWriters.length);
+            writeRow(row, encoder);
+        }
+
+        public void writeRow(InternalRow row, Encoder encoder) throws 
IOException {
+            for (int i = 0; i < fieldWriters.length; i += 1) {
+                fieldWriters[i].write(row, i, encoder);
+            }
+        }
+    }
+
+    public RowWriter createRowWriter(Schema schema, List<DataType> fieldTypes) 
{
+        return new RowWriter(schema, fieldTypes);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
deleted file mode 100644
index 76bcb380c..000000000
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/RowDataToAvroConverters.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowType;
-
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.util.Utf8;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.paimon.format.avro.AvroSchemaConverter.extractValueTypeToAvroMap;
-
-/** Tool class used to convert from {@link InternalRow} to Avro {@link 
GenericRecord}. */
-public class RowDataToAvroConverters {
-
-    // 
--------------------------------------------------------------------------------
-    // Runtime Converters
-    // 
--------------------------------------------------------------------------------
-
-    /**
-     * Runtime converter that converts objects of Paimon internal data 
structures to corresponding
-     * Avro data structures.
-     */
-    @FunctionalInterface
-    public interface RowDataToAvroConverter extends Serializable {
-        Object convert(Schema schema, Object object);
-    }
-
-    // 
--------------------------------------------------------------------------------
-    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
-    // necessary because the maven shade plugin cannot relocate classes in
-    // SerializedLambdas (MSHADE-260). On the other hand we want to relocate 
Avro for
-    // sql-client uber jars.
-    // 
--------------------------------------------------------------------------------
-
-    /**
-     * Creates a runtime converter according to the given logical type that 
converts objects of
-     * Paimon internal data structures to corresponding Avro data structures.
-     */
-    public static RowDataToAvroConverter createConverter(DataType type) {
-        final RowDataToAvroConverter converter;
-        switch (type.getTypeRoot()) {
-            case TINYINT:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return ((Byte) object).intValue();
-                            }
-                        };
-                break;
-            case SMALLINT:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return ((Short) object).intValue();
-                            }
-                        };
-                break;
-            case BOOLEAN: // boolean
-            case INTEGER: // int
-            case BIGINT: // long
-            case FLOAT: // float
-            case DOUBLE: // double
-            case TIME_WITHOUT_TIME_ZONE: // int
-            case DATE: // int
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return object;
-                            }
-                        };
-                break;
-            case CHAR:
-            case VARCHAR:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return new Utf8(object.toString());
-                            }
-                        };
-                break;
-            case BINARY:
-            case VARBINARY:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return ByteBuffer.wrap((byte[]) object);
-                            }
-                        };
-                break;
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                LogicalType logicalType = 
schema.getLogicalType();
-
-                                if (logicalType instanceof 
LogicalTypes.TimestampMillis) {
-                                    return ((Timestamp) 
object).toInstant().toEpochMilli();
-                                } else if (logicalType instanceof 
LogicalTypes.TimestampMicros) {
-                                    return ((Timestamp) object).toMicros();
-                                } else {
-                                    throw new UnsupportedOperationException(
-                                            "Unsupported timestamp type: " + 
logicalType);
-                                }
-                            }
-                        };
-                break;
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                LogicalType logicalType = 
schema.getLogicalType();
-
-                                if (logicalType instanceof 
LogicalTypes.LocalTimestampMillis) {
-                                    return ((Timestamp) 
object).toInstant().toEpochMilli();
-                                } else if (logicalType
-                                        instanceof 
LogicalTypes.LocalTimestampMicros) {
-                                    return ((Timestamp) object).toMicros();
-                                } else {
-                                    throw new UnsupportedOperationException(
-                                            "Unsupported timestamp type: " + 
logicalType);
-                                }
-                            }
-                        };
-                break;
-            case DECIMAL:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return ByteBuffer.wrap(((Decimal) 
object).toUnscaledBytes());
-                            }
-                        };
-                break;
-            case ARRAY:
-                converter = createArrayConverter((ArrayType) type);
-                break;
-            case ROW:
-                converter = createRowConverter((RowType) type);
-                break;
-            case MAP:
-            case MULTISET:
-                converter = createMapConverter(type);
-                break;
-            default:
-                throw new UnsupportedOperationException("Unsupported type: " + 
type);
-        }
-
-        // wrap into nullable converter
-        return new RowDataToAvroConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(Schema schema, Object object) {
-                if (object == null) {
-                    return null;
-                }
-
-                // get actual schema if it is a nullable schema
-                Schema actualSchema;
-                if (schema.getType() == Schema.Type.UNION) {
-                    List<Schema> types = schema.getTypes();
-                    int size = types.size();
-                    if (size == 2 && types.get(1).getType() == 
Schema.Type.NULL) {
-                        actualSchema = types.get(0);
-                    } else if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
-                        actualSchema = types.get(1);
-                    } else {
-                        throw new IllegalArgumentException(
-                                "The Avro schema is not a nullable type: " + 
schema);
-                    }
-                } else {
-                    actualSchema = schema;
-                }
-                return converter.convert(actualSchema, object);
-            }
-        };
-    }
-
-    private static RowDataToAvroConverter createRowConverter(RowType rowType) {
-        final RowDataToAvroConverter[] fieldConverters =
-                rowType.getFieldTypes().stream()
-                        .map(RowDataToAvroConverters::createConverter)
-                        .toArray(RowDataToAvroConverter[]::new);
-        final DataType[] fieldTypes =
-                
rowType.getFields().stream().map(DataField::type).toArray(DataType[]::new);
-        final InternalRow.FieldGetter[] fieldGetters =
-                new InternalRow.FieldGetter[fieldTypes.length];
-        for (int i = 0; i < fieldTypes.length; i++) {
-            fieldGetters[i] = InternalRow.createFieldGetter(fieldTypes[i], i);
-        }
-        final int length = rowType.getFieldCount();
-
-        return new RowDataToAvroConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(Schema schema, Object object) {
-                final InternalRow row = (InternalRow) object;
-                final List<Schema.Field> fields = schema.getFields();
-                final GenericRecord record = new GenericData.Record(schema);
-                for (int i = 0; i < length; ++i) {
-                    final Schema.Field schemaField = fields.get(i);
-                    try {
-                        Object avroObject =
-                                fieldConverters[i].convert(
-                                        schemaField.schema(), 
fieldGetters[i].getFieldOrNull(row));
-                        record.put(i, avroObject);
-                    } catch (Throwable t) {
-                        throw new RuntimeException(
-                                String.format(
-                                        "Fail to serialize at field: %s.", 
schemaField.name()),
-                                t);
-                    }
-                }
-                return record;
-            }
-        };
-    }
-
-    private static RowDataToAvroConverter createArrayConverter(ArrayType 
arrayType) {
-        DataType elementType = arrayType.getElementType();
-        final InternalArray.ElementGetter elementGetter =
-                InternalArray.createElementGetter(elementType);
-        final RowDataToAvroConverter elementConverter = 
createConverter(arrayType.getElementType());
-
-        return new RowDataToAvroConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(Schema schema, Object object) {
-                final Schema elementSchema = schema.getElementType();
-                InternalArray arrayData = (InternalArray) object;
-                List<Object> list = new ArrayList<>();
-                for (int i = 0; i < arrayData.size(); ++i) {
-                    list.add(
-                            elementConverter.convert(
-                                    elementSchema, 
elementGetter.getElementOrNull(arrayData, i)));
-                }
-                return list;
-            }
-        };
-    }
-
-    private static RowDataToAvroConverter createMapConverter(DataType type) {
-        DataType valueType = extractValueTypeToAvroMap(type);
-        final InternalArray.ElementGetter valueGetter =
-                InternalArray.createElementGetter(valueType);
-        final RowDataToAvroConverter valueConverter = 
createConverter(valueType);
-
-        return new RowDataToAvroConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(Schema schema, Object object) {
-                final Schema valueSchema = schema.getValueType();
-                final InternalMap mapData = (InternalMap) object;
-                final InternalArray keyArray = mapData.keyArray();
-                final InternalArray valueArray = mapData.valueArray();
-                final Map<Object, Object> map = new HashMap<>(mapData.size());
-                for (int i = 0; i < mapData.size(); ++i) {
-                    final String key = keyArray.getString(i).toString();
-                    final Object value =
-                            valueConverter.convert(
-                                    valueSchema, 
valueGetter.getElementOrNull(valueArray, i));
-                    map.put(key, value);
-                }
-                return map;
-            }
-        };
-    }
-}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
deleted file mode 100644
index b73cde5cc..000000000
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.FileIOUtils;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.paimon.format.avro.AvroBulkFormatTestUtils.ROW_TYPE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for {@link AbstractAvroBulkFormat}. */
-class AvroBulkFormatTest {
-
-    private static final long TIMESTAMP = System.currentTimeMillis();
-
-    private static final List<InternalRow> TEST_DATA =
-            Arrays.asList(
-                    // -------- batch 0, block start 232 --------
-                    GenericRow.of(
-                            BinaryString.fromString("AvroBulk"),
-                            BinaryString.fromString("FormatTest"),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
-                    GenericRow.of(
-                            BinaryString.fromString("Apache"),
-                            BinaryString.fromString("Paimon"),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
-                    GenericRow.of(
-                            BinaryString.fromString(
-                                    "永和九年,岁在癸丑,暮春之初,会于会稽山阴之兰亭,修禊事也。群贤毕至,少"
-                                            + "长咸集。此地有崇山峻岭,茂林修竹,又有清流激湍,映带左右。引"
-                                            + "以为流觞曲水,列坐其次。虽无丝竹管弦之盛,一觞一咏,亦足以畅"
-                                            + "叙幽情。"),
-                            BinaryString.fromString(""),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
-                    // -------- batch 1, block start 689 --------
-                    GenericRow.of(
-                            BinaryString.fromString("File"),
-                            BinaryString.fromString("Format"),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
-                    GenericRow.of(
-                            null,
-                            BinaryString.fromString(
-                                    "This is a string with English, 中文 and 
even 🍎🍌🍑🥝🍍🥭🍐"),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123)),
-                    // -------- batch 2, block start 1147 --------
-                    GenericRow.of(
-                            BinaryString.fromString("block with"),
-                            BinaryString.fromString("only one record"),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123),
-                            Timestamp.fromEpochMillis(TIMESTAMP),
-                            Timestamp.fromMicros(TIMESTAMP * 1000 + 123))
-                    // -------- file length 1323 --------
-                    );
-    private static final List<Long> BLOCK_STARTS = Arrays.asList(689L, 1147L, 
1323L);
-
-    private File tmpFile;
-
-    @BeforeEach
-    public void before() throws IOException {
-        tmpFile = Files.createTempFile("avro-bulk-format-test", 
".avro").toFile();
-        tmpFile.createNewFile();
-        FileOutputStream out = new FileOutputStream(tmpFile);
-
-        Schema schema = AvroSchemaConverter.convertToSchema(ROW_TYPE);
-        RowDataToAvroConverters.RowDataToAvroConverter converter =
-                RowDataToAvroConverters.createConverter(ROW_TYPE);
-
-        DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-        DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter);
-        dataFileWriter.create(schema, out);
-
-        //  Generate the sync points manually in order to test blocks.
-        long syncBlock1 = dataFileWriter.sync();
-        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(0)));
-        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(1)));
-        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(2)));
-        long syncBlock2 = dataFileWriter.sync();
-        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(3)));
-        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(4)));
-        long syncBlock3 = dataFileWriter.sync();
-        dataFileWriter.append((GenericRecord) converter.convert(schema, 
TEST_DATA.get(5)));
-        long syncEnd = dataFileWriter.sync();
-        dataFileWriter.close();
-
-        // These values should be constant if nothing else changes with the 
file.
-        assertThat(BLOCK_STARTS).isEqualTo(Arrays.asList(syncBlock1, 
syncBlock2, syncBlock3));
-        assertThat(tmpFile).hasSize(syncEnd);
-    }
-
-    @AfterEach
-    public void after() throws IOException {
-        FileIOUtils.deleteFileOrDirectory(tmpFile);
-    }
-
-    @Test
-    void testReadWholeFileWithOneSplit() throws IOException {
-        AvroBulkFormatTestUtils.TestingAvroBulkFormat bulkFormat =
-                new AvroBulkFormatTestUtils.TestingAvroBulkFormat();
-        RecordReader<InternalRow> reader =
-                bulkFormat.createReader(new LocalFileIO(), new 
Path(tmpFile.toString()));
-        AtomicInteger i = new AtomicInteger(0);
-        reader.forEachRemaining(
-                rowData -> 
assertThat(rowData).isEqualTo(TEST_DATA.get(i.getAndIncrement())));
-    }
-}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTestUtils.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTestUtils.java
deleted file mode 100644
index b68a6379b..000000000
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroBulkFormatTestUtils.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-
-import java.util.function.Function;
-
-/** Testing utils for tests related to {@link AbstractAvroBulkFormat}. */
-public class AvroBulkFormatTestUtils {
-
-    public static final RowType ROW_TYPE =
-            (RowType)
-                    RowType.builder()
-                            .fields(
-                                    new DataType[] {
-                                        DataTypes.STRING(),
-                                        DataTypes.STRING(),
-                                        DataTypes.TIMESTAMP(3),
-                                        DataTypes.TIMESTAMP(6),
-                                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3),
-                                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6),
-                                    },
-                                    new String[] {
-                                        "a",
-                                        "b",
-                                        "timestamp_millis",
-                                        "timestamp_micros",
-                                        "local_timestamp_millis",
-                                        "local_timestamp_micros"
-                                    })
-                            .build()
-                            .notNull();
-
-    /** {@link AbstractAvroBulkFormat} for tests. */
-    public static class TestingAvroBulkFormat extends 
AbstractAvroBulkFormat<GenericRecord> {
-
-        protected TestingAvroBulkFormat() {
-            super(AvroSchemaConverter.convertToSchema(ROW_TYPE));
-        }
-
-        @Override
-        protected GenericRecord createReusedAvroRecord() {
-            return new GenericData.Record(readerSchema);
-        }
-
-        @Override
-        protected Function<GenericRecord, InternalRow> createConverter() {
-            AvroToRowDataConverters.AvroToRowDataConverter converter =
-                    AvroToRowDataConverters.createRowConverter(ROW_TYPE);
-            return record -> record == null ? null : (InternalRow) 
converter.convert(record);
-        }
-    }
-}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
new file mode 100644
index 000000000..6202b7a7a
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.avro;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.options.Options;
+
+/** An avro {@link FormatReadWriteTest}. */
+public class AvroFormatReadWriteTest extends FormatReadWriteTest {
+
+    @Override
+    protected FileFormat fileFormat() {
+        return new AvroFileFormat(new Options());
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroToRowDataConvertersTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroToRowDataConvertersTest.java
deleted file mode 100644
index 31cac15a3..000000000
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroToRowDataConvertersTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.Timestamp;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.Instant;
-
-/** Test for avro to row data converters. */
-public class AvroToRowDataConvertersTest {
-    private static final Timestamp NOW = Timestamp.now();
-    private static final long TS_MILLIS = NOW.getMillisecond();
-    private static final long TS_MICROS = NOW.toMicros() + 123L;
-    private static final Timestamp NOW_MICROS = 
Timestamp.fromMicros(TS_MICROS);
-    private static final Instant INSTANT = Instant.ofEpochMilli(TS_MILLIS);
-
-    @Test
-    public void testConvertToTimestamp() {
-        Assertions.assertEquals(NOW, 
AvroToRowDataConverters.convertToTimestamp(TS_MILLIS, 3));
-
-        Assertions.assertEquals(
-                NOW_MICROS, 
AvroToRowDataConverters.convertToTimestamp(TS_MICROS, 6));
-
-        Assertions.assertEquals(NOW, 
AvroToRowDataConverters.convertToTimestamp(INSTANT, 3));
-
-        Assertions.assertEquals(NOW, 
AvroToRowDataConverters.convertToTimestamp(INSTANT, 6));
-    }
-}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/RowDataToAvroConvertersTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/avro/RowDataToAvroConvertersTest.java
deleted file mode 100644
index 9b38176aa..000000000
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/avro/RowDataToAvroConvertersTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.avro;
-
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.LocalZonedTimestampType;
-
-import org.apache.avro.Schema;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-
-/** Test for row data to avro converters. */
-public class RowDataToAvroConvertersTest {
-
-    @Test
-    public void testTimestampWithTimeType() {
-        LocalZonedTimestampType zonedTimestampType = 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3);
-        RowDataToAvroConverters.RowDataToAvroConverter rowDataToAvroConverter =
-                RowDataToAvroConverters.createConverter(zonedTimestampType);
-
-        OffsetDateTime offsetDateTime =
-                OffsetDateTime.of(LocalDate.of(2023, 1, 1), LocalTime.of(0, 0, 
0), ZoneOffset.UTC);
-
-        Schema schema = 
AvroSchemaConverter.convertToSchema(zonedTimestampType);
-        long timestamp = offsetDateTime.toInstant().toEpochMilli();
-        Timestamp millis = Timestamp.fromEpochMillis(timestamp);
-        Object converted = rowDataToAvroConverter.convert(schema, millis);
-
-        AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
-                AvroToRowDataConverters.createConverter(zonedTimestampType);
-
-        Object original = avroToRowDataConverter.convert(converted);
-        Assertions.assertEquals(millis, original);
-    }
-}

Reply via email to