This is an automated email from the ASF dual-hosted git repository.

zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 64de43f3 [AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for 
basic types (#2079)
64de43f3 is described below

commit 64de43f3ba3818aa387ec422d5580165e3633a8f
Author: xTong <[email protected]>
AuthorDate: Tue Mar 10 17:21:46 2026 +0800

    [AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for basic types 
(#2079)
    
    # Which issue does this PR close?
    
    Partially addresses #1850 (Part 2a of the Flink RowData to Arrow
    conversion).
    
    # Rationale for this change
    
    Per AIP-1, the Flink integration data path requires converting Flink
    `RowData` into Arrow `VectorSchemaRoot` for export to the native engine
    (DataFusion/Rust). This PR implements the writer layer for basic types,
    following Flink's official `flink-python` Arrow implementation as
    requested during Part 1 review (#1959).
    
    # What changes are included in this PR?
    
    ## Commit 1: ArrowFieldWriter base class + 12 type writers (16 files,
    +2181 lines)
    
    - **`ArrowFieldWriter<IN>`** — Generic abstract base class using
    template method pattern (`write()` → `doWrite()` + count++), aligned
    with Flink's `flink-python` `ArrowFieldWriter`.
    - **12 concrete writers** in `writers/` sub-package, each with
    `forRow()`/`forArray()` dual-mode factory methods:
    - Numeric: `IntWriter`, `TinyIntWriter`, `SmallIntWriter`,
    `BigIntWriter`, `FloatWriter`, `DoubleWriter`
    - Non-numeric: `BooleanWriter`, `VarCharWriter`, `VarBinaryWriter`,
    `DecimalWriter`, `DateWriter`, `NullWriter`
    - **Key design**: Each writer (except `NullWriter`) has two `public
    static final` inner classes (`XxxWriterForRow` / `XxxWriterForArray`)
    because Flink's `RowData` and `ArrayData` have no common getter
    interface.
    - **Special cases**:
    - `NullWriter`: No inner classes needed, `doWrite()` is empty
    (NullVector values are inherently null)
    - `DecimalWriter`: Takes precision/scale parameters, includes
    `fitBigDecimal()` validation before writing (aligned with Flink's
    `fromBigDecimal` logic)
    - **Unit tests**: `IntWriterTest` (5), `BasicWritersTest` (20),
    `NonNumericWritersTest` (12) — 37 tests
    
    ## Commit 2: FlinkArrowWriter orchestrator + factory methods (3 files,
    +482 lines)
    
    - **`FlinkArrowWriter`** — Orchestrates per-column
    `ArrowFieldWriter<RowData>[]` to write Flink `RowData` into Arrow
    `VectorSchemaRoot`. Lifecycle: `create()` → `write(row)*` → `finish()` →
    `reset()`.
    - **Factory methods in `FlinkArrowUtils`** —
    `createArrowFieldWriterForRow()`/`createArrowFieldWriterForArray()`
    dispatch writer creation based on Arrow vector type (instanceof chain).
    Both are package-private.
    - **Integration tests**: `FlinkArrowWriterTest` (7) — all-types write,
    null handling, multi-row batches, reset, empty batch, zero columns,
    unsupported type. Total: **53 tests, all passing**.
    
    # Scope
    
    This PR covers basic types only. Time, Timestamp, and complex types
    (Array/Map/Row) will be in Part 2b.
    
    # Are there any user-facing changes?
    
    No. Internal API for Flink integration.
    
    # How was this patch tested?
    
    53 tests across 4 test classes:
    
    ```bash
    ./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
      -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
    ```
    
    Result: 53 pass, 0 failures.
---
 .../apache/auron/flink/arrow/FlinkArrowUtils.java  | 108 +++++
 .../apache/auron/flink/arrow/FlinkArrowWriter.java |  97 ++++
 .../flink/arrow/writers/ArrowFieldWriter.java      |  79 ++++
 .../auron/flink/arrow/writers/BigIntWriter.java    |  94 ++++
 .../auron/flink/arrow/writers/BooleanWriter.java   |  94 ++++
 .../auron/flink/arrow/writers/DateWriter.java      |  94 ++++
 .../auron/flink/arrow/writers/DecimalWriter.java   | 116 +++++
 .../auron/flink/arrow/writers/DoubleWriter.java    |  94 ++++
 .../auron/flink/arrow/writers/FloatWriter.java     |  94 ++++
 .../auron/flink/arrow/writers/IntWriter.java       |  94 ++++
 .../auron/flink/arrow/writers/NullWriter.java      |  51 +++
 .../auron/flink/arrow/writers/SmallIntWriter.java  |  94 ++++
 .../auron/flink/arrow/writers/TinyIntWriter.java   |  94 ++++
 .../auron/flink/arrow/writers/VarBinaryWriter.java |  95 ++++
 .../auron/flink/arrow/writers/VarCharWriter.java   |  96 ++++
 .../auron/flink/arrow/FlinkArrowWriterTest.java    | 277 +++++++++++
 .../flink/arrow/writers/BasicWritersTest.java      | 509 +++++++++++++++++++++
 .../auron/flink/arrow/writers/IntWriterTest.java   | 150 ++++++
 .../flink/arrow/writers/NonNumericWritersTest.java | 333 ++++++++++++++
 19 files changed, 2663 insertions(+)

diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
index 0763a847..273c66c0 100644
--- 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -20,6 +20,19 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
@@ -28,6 +41,21 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.writers.ArrowFieldWriter;
+import org.apache.auron.flink.arrow.writers.BigIntWriter;
+import org.apache.auron.flink.arrow.writers.BooleanWriter;
+import org.apache.auron.flink.arrow.writers.DateWriter;
+import org.apache.auron.flink.arrow.writers.DecimalWriter;
+import org.apache.auron.flink.arrow.writers.DoubleWriter;
+import org.apache.auron.flink.arrow.writers.FloatWriter;
+import org.apache.auron.flink.arrow.writers.IntWriter;
+import org.apache.auron.flink.arrow.writers.NullWriter;
+import org.apache.auron.flink.arrow.writers.SmallIntWriter;
+import org.apache.auron.flink.arrow.writers.TinyIntWriter;
+import org.apache.auron.flink.arrow.writers.VarBinaryWriter;
+import org.apache.auron.flink.arrow.writers.VarCharWriter;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BinaryType;
@@ -217,6 +245,86 @@ public final class FlinkArrowUtils {
         return new Schema(fields);
     }
 
+    /**
+     * Creates an {@link ArrowFieldWriter} for top-level {@link RowData} 
fields.
+     *
+     * @param vector the Arrow vector to write into
+     * @param fieldType the Flink logical type of the field
+     * @return a writer that reads from RowData at a given ordinal
+     * @throws UnsupportedOperationException if the vector type is not 
supported
+     */
+    static ArrowFieldWriter<RowData> createArrowFieldWriterForRow(ValueVector 
vector, LogicalType fieldType) {
+        if (vector instanceof NullVector) {
+            return NullWriter.forRow((NullVector) vector);
+        } else if (vector instanceof BitVector) {
+            return BooleanWriter.forRow((BitVector) vector);
+        } else if (vector instanceof TinyIntVector) {
+            return TinyIntWriter.forRow((TinyIntVector) vector);
+        } else if (vector instanceof SmallIntVector) {
+            return SmallIntWriter.forRow((SmallIntVector) vector);
+        } else if (vector instanceof IntVector) {
+            return IntWriter.forRow((IntVector) vector);
+        } else if (vector instanceof BigIntVector) {
+            return BigIntWriter.forRow((BigIntVector) vector);
+        } else if (vector instanceof Float4Vector) {
+            return FloatWriter.forRow((Float4Vector) vector);
+        } else if (vector instanceof Float8Vector) {
+            return DoubleWriter.forRow((Float8Vector) vector);
+        } else if (vector instanceof VarCharVector) {
+            return VarCharWriter.forRow((VarCharVector) vector);
+        } else if (vector instanceof VarBinaryVector) {
+            return VarBinaryWriter.forRow((VarBinaryVector) vector);
+        } else if (vector instanceof DecimalVector) {
+            DecimalType decimalType = (DecimalType) fieldType;
+            return DecimalWriter.forRow((DecimalVector) vector, 
decimalType.getPrecision(), decimalType.getScale());
+        } else if (vector instanceof DateDayVector) {
+            return DateWriter.forRow((DateDayVector) vector);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported vector type: " + 
vector.getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * Creates an {@link ArrowFieldWriter} for nested {@link ArrayData} 
elements.
+     *
+     * @param vector the Arrow vector to write into
+     * @param fieldType the Flink logical type of the element
+     * @return a writer that reads from ArrayData at a given ordinal
+     * @throws UnsupportedOperationException if the vector type is not 
supported
+     */
+    static ArrowFieldWriter<ArrayData> 
createArrowFieldWriterForArray(ValueVector vector, LogicalType fieldType) {
+        if (vector instanceof NullVector) {
+            return NullWriter.forArray((NullVector) vector);
+        } else if (vector instanceof BitVector) {
+            return BooleanWriter.forArray((BitVector) vector);
+        } else if (vector instanceof TinyIntVector) {
+            return TinyIntWriter.forArray((TinyIntVector) vector);
+        } else if (vector instanceof SmallIntVector) {
+            return SmallIntWriter.forArray((SmallIntVector) vector);
+        } else if (vector instanceof IntVector) {
+            return IntWriter.forArray((IntVector) vector);
+        } else if (vector instanceof BigIntVector) {
+            return BigIntWriter.forArray((BigIntVector) vector);
+        } else if (vector instanceof Float4Vector) {
+            return FloatWriter.forArray((Float4Vector) vector);
+        } else if (vector instanceof Float8Vector) {
+            return DoubleWriter.forArray((Float8Vector) vector);
+        } else if (vector instanceof VarCharVector) {
+            return VarCharWriter.forArray((VarCharVector) vector);
+        } else if (vector instanceof VarBinaryVector) {
+            return VarBinaryWriter.forArray((VarBinaryVector) vector);
+        } else if (vector instanceof DecimalVector) {
+            DecimalType decimalType = (DecimalType) fieldType;
+            return DecimalWriter.forArray((DecimalVector) vector, 
decimalType.getPrecision(), decimalType.getScale());
+        } else if (vector instanceof DateDayVector) {
+            return DateWriter.forArray((DateDayVector) vector);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported vector type: " + 
vector.getClass().getSimpleName());
+        }
+    }
+
     private FlinkArrowUtils() {
         // Utility class
     }
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java
new file mode 100644
index 00000000..a339fb7c
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.auron.flink.arrow.writers.ArrowFieldWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Orchestrates per-column {@link ArrowFieldWriter}s to write {@link RowData} 
into an Arrow {@link
+ * VectorSchemaRoot}.
+ *
+ * <p>Use {@link #create(VectorSchemaRoot, RowType)} to construct, then call 
{@link #write(RowData)}
+ * for each row, {@link #finish()} to finalize the batch, and {@link #reset()} 
before starting a new
+ * batch.
+ */
+public final class FlinkArrowWriter {
+
+    private final VectorSchemaRoot root;
+    private final ArrowFieldWriter<RowData>[] fieldWriters;
+
+    @SuppressWarnings("unchecked")
+    private FlinkArrowWriter(VectorSchemaRoot root, 
ArrowFieldWriter<RowData>[] fieldWriters) {
+        this.root = root;
+        this.fieldWriters = fieldWriters;
+    }
+
+    /**
+     * Creates a writer from an existing {@link VectorSchemaRoot} and Flink 
{@link RowType}.
+     *
+     * <p>Each vector in the root is allocated and a matching {@link 
ArrowFieldWriter} is created via
+     * {@link FlinkArrowUtils#createArrowFieldWriterForRow}.
+     *
+     * @param root the Arrow vector schema root to write into
+     * @param rowType the Flink row type describing the schema
+     * @return a new writer instance
+     */
+    @SuppressWarnings("unchecked")
+    public static FlinkArrowWriter create(VectorSchemaRoot root, RowType 
rowType) {
+        ArrowFieldWriter<RowData>[] fieldWriters =
+                new ArrowFieldWriter[root.getFieldVectors().size()];
+        for (int i = 0; i < fieldWriters.length; i++) {
+            FieldVector vector = root.getFieldVectors().get(i);
+            vector.allocateNew();
+            fieldWriters[i] = 
FlinkArrowUtils.createArrowFieldWriterForRow(vector, rowType.getTypeAt(i));
+        }
+        return new FlinkArrowWriter(root, fieldWriters);
+    }
+
+    /**
+     * Writes a single {@link RowData} into the underlying Arrow vectors.
+     *
+     * @param row the row to write
+     */
+    public void write(RowData row) {
+        for (int i = 0; i < fieldWriters.length; i++) {
+            fieldWriters[i].write(row, i);
+        }
+    }
+
+    /** Finalizes the current batch by setting the row count and finishing 
each field writer. */
+    public void finish() {
+        root.setRowCount(fieldWriters.length > 0 ? fieldWriters[0].getCount() 
: 0);
+        for (ArrowFieldWriter<RowData> w : fieldWriters) {
+            w.finish();
+        }
+    }
+
+    /** Resets all vectors and field writers for a new batch. */
+    public void reset() {
+        root.setRowCount(0);
+        for (ArrowFieldWriter<RowData> w : fieldWriters) {
+            w.reset();
+        }
+    }
+
+    /** Returns the underlying {@link VectorSchemaRoot}. */
+    public VectorSchemaRoot getRoot() {
+        return root;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrowFieldWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrowFieldWriter.java
new file mode 100644
index 00000000..77122a37
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrowFieldWriter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.util.Objects;
+import org.apache.arrow.vector.ValueVector;
+
+/**
+ * Base class for writing Flink data into Arrow vectors.
+ *
+ * <p>Uses a template method pattern: {@link #write(Object, int)} increments 
the row count and
+ * delegates to {@link #doWrite(Object, int)} for type-specific logic.
+ *
+ * @param <IN> the input data type (e.g., RowData or ArrayData)
+ */
+public abstract class ArrowFieldWriter<IN> {
+
+    private final ValueVector valueVector;
+    private int count;
+
+    protected ArrowFieldWriter(ValueVector valueVector) {
+        this.valueVector = Objects.requireNonNull(valueVector, "valueVector 
must not be null");
+    }
+
+    /** Returns the underlying Arrow vector. */
+    public ValueVector getValueVector() {
+        return valueVector;
+    }
+
+    /** Returns the number of values written since the last reset. */
+    public int getCount() {
+        return count;
+    }
+
+    /**
+     * Writes a value from the input at the given ordinal position.
+     *
+     * @param row the input data
+     * @param ordinal the field index within the input
+     */
+    public void write(IN row, int ordinal) {
+        doWrite(row, ordinal);
+        count++;
+    }
+
+    /**
+     * Type-specific write logic. Implementations read from the input and 
write into the Arrow
+     * vector at position {@link #getCount()}.
+     *
+     * @param row the input data
+     * @param ordinal the field index within the input
+     */
+    public abstract void doWrite(IN row, int ordinal);
+
+    /** Finalizes the current batch by setting the value count on the vector. 
*/
+    public void finish() {
+        valueVector.setValueCount(count);
+    }
+
+    /** Resets the vector and count for a new batch. */
+    public void reset() {
+        valueVector.reset();
+        count = 0;
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BigIntWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BigIntWriter.java
new file mode 100644
index 00000000..b2b67f9f
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BigIntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 64-bit integers ({@link BigIntVector}).
+ *
+ * <p>Use {@link #forRow(BigIntVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(BigIntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class BigIntWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a BigIntWriter that reads from {@link RowData}. */
+    public static BigIntWriter<RowData> forRow(BigIntVector bigIntVector) {
+        return new BigIntWriterForRow(bigIntVector);
+    }
+
+    /** Creates a BigIntWriter that reads from {@link ArrayData}. */
+    public static BigIntWriter<ArrayData> forArray(BigIntVector bigIntVector) {
+        return new BigIntWriterForArray(bigIntVector);
+    }
+
+    private BigIntWriter(BigIntVector bigIntVector) {
+        super(bigIntVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract long readLong(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        BigIntVector vector = (BigIntVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readLong(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class BigIntWriterForRow extends BigIntWriter<RowData> 
{
+        private BigIntWriterForRow(BigIntVector bigIntVector) {
+            super(bigIntVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        long readLong(RowData in, int ordinal) {
+            return in.getLong(ordinal);
+        }
+    }
+
+    public static final class BigIntWriterForArray extends 
BigIntWriter<ArrayData> {
+        private BigIntWriterForArray(BigIntVector bigIntVector) {
+            super(bigIntVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        long readLong(ArrayData in, int ordinal) {
+            return in.getLong(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BooleanWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BooleanWriter.java
new file mode 100644
index 00000000..c7c53e59
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BooleanWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for booleans ({@link BitVector}).
+ *
+ * <p>Use {@link #forRow(BitVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(BitVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class BooleanWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a BooleanWriter that reads from {@link RowData}. */
+    public static BooleanWriter<RowData> forRow(BitVector bitVector) {
+        return new BooleanWriterForRow(bitVector);
+    }
+
+    /** Creates a BooleanWriter that reads from {@link ArrayData}. */
+    public static BooleanWriter<ArrayData> forArray(BitVector bitVector) {
+        return new BooleanWriterForArray(bitVector);
+    }
+
+    private BooleanWriter(BitVector bitVector) {
+        super(bitVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract boolean readBoolean(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        BitVector vector = (BitVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readBoolean(in, ordinal) ? 1 : 0);
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class BooleanWriterForRow extends 
BooleanWriter<RowData> {
+        private BooleanWriterForRow(BitVector bitVector) {
+            super(bitVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        boolean readBoolean(RowData in, int ordinal) {
+            return in.getBoolean(ordinal);
+        }
+    }
+
+    public static final class BooleanWriterForArray extends 
BooleanWriter<ArrayData> {
+        private BooleanWriterForArray(BitVector bitVector) {
+            super(bitVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        boolean readBoolean(ArrayData in, int ordinal) {
+            return in.getBoolean(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DateWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DateWriter.java
new file mode 100644
index 00000000..566f97f7
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DateWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for dates stored as days since epoch ({@link 
DateDayVector}).
+ *
+ * <p>Use {@link #forRow(DateDayVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(DateDayVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class DateWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a DateWriter that reads from {@link RowData}. */
+    public static DateWriter<RowData> forRow(DateDayVector dateDayVector) {
+        return new DateWriterForRow(dateDayVector);
+    }
+
+    /** Creates a DateWriter that reads from {@link ArrayData}. */
+    public static DateWriter<ArrayData> forArray(DateDayVector dateDayVector) {
+        return new DateWriterForArray(dateDayVector);
+    }
+
+    private DateWriter(DateDayVector dateDayVector) {
+        super(dateDayVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract int readDate(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        DateDayVector vector = (DateDayVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readDate(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class DateWriterForRow extends DateWriter<RowData> {
+        private DateWriterForRow(DateDayVector dateDayVector) {
+            super(dateDayVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        int readDate(RowData in, int ordinal) {
+            return in.getInt(ordinal);
+        }
+    }
+
+    public static final class DateWriterForArray extends DateWriter<ArrayData> 
{
+        private DateWriterForArray(DateDayVector dateDayVector) {
+            super(dateDayVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        int readDate(ArrayData in, int ordinal) {
+            return in.getInt(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DecimalWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DecimalWriter.java
new file mode 100644
index 00000000..b3c95c74
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DecimalWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for decimals ({@link DecimalVector}).
+ *
+ * <p>Use {@link #forRow(DecimalVector, int, int)} when writing from {@link 
RowData} and {@link
+ * #forArray(DecimalVector, int, int)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class DecimalWriter<T> extends ArrowFieldWriter<T> {
+
+    protected final int precision;
+    protected final int scale;
+
+    /** Creates a DecimalWriter that reads from {@link RowData}. */
+    public static DecimalWriter<RowData> forRow(DecimalVector decimalVector, 
int precision, int scale) {
+        return new DecimalWriterForRow(decimalVector, precision, scale);
+    }
+
+    /** Creates a DecimalWriter that reads from {@link ArrayData}. */
+    public static DecimalWriter<ArrayData> forArray(DecimalVector 
decimalVector, int precision, int scale) {
+        return new DecimalWriterForArray(decimalVector, precision, scale);
+    }
+
+    private DecimalWriter(DecimalVector decimalVector, int precision, int 
scale) {
+        super(decimalVector);
+        this.precision = precision;
+        this.scale = scale;
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract DecimalData readDecimal(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        DecimalVector vector = (DecimalVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            BigDecimal bigDecimal = readDecimal(in, ordinal).toBigDecimal();
+            bigDecimal = fitBigDecimal(bigDecimal, precision, scale);
+            if (bigDecimal == null) {
+                vector.setNull(getCount());
+            } else {
+                vector.setSafe(getCount(), bigDecimal);
+            }
+        }
+    }
+
+    private static BigDecimal fitBigDecimal(BigDecimal value, int precision, 
int scale) {
+        value = value.setScale(scale, RoundingMode.HALF_UP);
+        if (value.precision() > precision) {
+            return null;
+        }
+        return value;
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class DecimalWriterForRow extends 
DecimalWriter<RowData> {
+        private DecimalWriterForRow(DecimalVector decimalVector, int 
precision, int scale) {
+            super(decimalVector, precision, scale);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        DecimalData readDecimal(RowData in, int ordinal) {
+            return in.getDecimal(ordinal, precision, scale);
+        }
+    }
+
+    public static final class DecimalWriterForArray extends 
DecimalWriter<ArrayData> {
+        private DecimalWriterForArray(DecimalVector decimalVector, int 
precision, int scale) {
+            super(decimalVector, precision, scale);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        DecimalData readDecimal(ArrayData in, int ordinal) {
+            return in.getDecimal(ordinal, precision, scale);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DoubleWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DoubleWriter.java
new file mode 100644
index 00000000..c23abdd9
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DoubleWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 64-bit doubles ({@link Float8Vector}).
+ *
+ * <p>Use {@link #forRow(Float8Vector)} when writing from {@link RowData} and 
{@link
+ * #forArray(Float8Vector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class DoubleWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a DoubleWriter that reads from {@link RowData}. */
+    public static DoubleWriter<RowData> forRow(Float8Vector float8Vector) {
+        return new DoubleWriterForRow(float8Vector);
+    }
+
+    /** Creates a DoubleWriter that reads from {@link ArrayData}. */
+    public static DoubleWriter<ArrayData> forArray(Float8Vector float8Vector) {
+        return new DoubleWriterForArray(float8Vector);
+    }
+
+    private DoubleWriter(Float8Vector float8Vector) {
+        super(float8Vector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract double readDouble(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        Float8Vector vector = (Float8Vector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readDouble(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class DoubleWriterForRow extends DoubleWriter<RowData> 
{
+        private DoubleWriterForRow(Float8Vector float8Vector) {
+            super(float8Vector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        double readDouble(RowData in, int ordinal) {
+            return in.getDouble(ordinal);
+        }
+    }
+
+    public static final class DoubleWriterForArray extends 
DoubleWriter<ArrayData> {
+        private DoubleWriterForArray(Float8Vector float8Vector) {
+            super(float8Vector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        double readDouble(ArrayData in, int ordinal) {
+            return in.getDouble(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/FloatWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/FloatWriter.java
new file mode 100644
index 00000000..e6fe2a72
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/FloatWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 32-bit floats ({@link Float4Vector}).
+ *
+ * <p>Use {@link #forRow(Float4Vector)} when writing from {@link RowData} and 
{@link
+ * #forArray(Float4Vector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class FloatWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a FloatWriter that reads from {@link RowData}. */
+    public static FloatWriter<RowData> forRow(Float4Vector float4Vector) {
+        return new FloatWriterForRow(float4Vector);
+    }
+
+    /** Creates a FloatWriter that reads from {@link ArrayData}. */
+    public static FloatWriter<ArrayData> forArray(Float4Vector float4Vector) {
+        return new FloatWriterForArray(float4Vector);
+    }
+
+    private FloatWriter(Float4Vector float4Vector) {
+        super(float4Vector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract float readFloat(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        Float4Vector vector = (Float4Vector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readFloat(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class FloatWriterForRow extends FloatWriter<RowData> {
+        private FloatWriterForRow(Float4Vector float4Vector) {
+            super(float4Vector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        float readFloat(RowData in, int ordinal) {
+            return in.getFloat(ordinal);
+        }
+    }
+
+    public static final class FloatWriterForArray extends 
FloatWriter<ArrayData> {
+        private FloatWriterForArray(Float4Vector float4Vector) {
+            super(float4Vector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        float readFloat(ArrayData in, int ordinal) {
+            return in.getFloat(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/IntWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/IntWriter.java
new file mode 100644
index 00000000..11e02122
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/IntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 32-bit integers ({@link IntVector}).
+ *
+ * <p>Use {@link #forRow(IntVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(IntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class IntWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates an IntWriter that reads from {@link RowData}. */
+    public static IntWriter<RowData> forRow(IntVector intVector) {
+        return new IntWriterForRow(intVector);
+    }
+
+    /** Creates an IntWriter that reads from {@link ArrayData}. */
+    public static IntWriter<ArrayData> forArray(IntVector intVector) {
+        return new IntWriterForArray(intVector);
+    }
+
+    private IntWriter(IntVector intVector) {
+        super(intVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract int readInt(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        IntVector vector = (IntVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readInt(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class IntWriterForRow extends IntWriter<RowData> {
+        private IntWriterForRow(IntVector intVector) {
+            super(intVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        int readInt(RowData in, int ordinal) {
+            return in.getInt(ordinal);
+        }
+    }
+
+    public static final class IntWriterForArray extends IntWriter<ArrayData> {
+        private IntWriterForArray(IntVector intVector) {
+            super(intVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        int readInt(ArrayData in, int ordinal) {
+            return in.getInt(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/NullWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/NullWriter.java
new file mode 100644
index 00000000..59dfe15b
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/NullWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.NullVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for null type ({@link NullVector}).
+ *
+ * <p>Every value written is null. Use {@link #forRow(NullVector)} when 
writing from {@link RowData}
+ * and {@link #forArray(NullVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public class NullWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a NullWriter that reads from {@link RowData}. */
+    public static NullWriter<RowData> forRow(NullVector nullVector) {
+        return new NullWriter<>(nullVector);
+    }
+
+    /** Creates a NullWriter that reads from {@link ArrayData}. */
+    public static NullWriter<ArrayData> forArray(NullVector nullVector) {
+        return new NullWriter<>(nullVector);
+    }
+
+    private NullWriter(NullVector nullVector) {
+        super(nullVector);
+    }
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        // NullVector values are inherently null; no explicit write needed.
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/SmallIntWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/SmallIntWriter.java
new file mode 100644
index 00000000..ff0c11c1
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/SmallIntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 16-bit integers ({@link SmallIntVector}).
+ *
+ * <p>Use {@link #forRow(SmallIntVector)} when writing from {@link RowData} 
and {@link
+ * #forArray(SmallIntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class SmallIntWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a SmallIntWriter that reads from {@link RowData}. */
+    public static SmallIntWriter<RowData> forRow(SmallIntVector 
smallIntVector) {
+        return new SmallIntWriterForRow(smallIntVector);
+    }
+
+    /** Creates a SmallIntWriter that reads from {@link ArrayData}. */
+    public static SmallIntWriter<ArrayData> forArray(SmallIntVector 
smallIntVector) {
+        return new SmallIntWriterForArray(smallIntVector);
+    }
+
+    private SmallIntWriter(SmallIntVector smallIntVector) {
+        super(smallIntVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract short readShort(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        SmallIntVector vector = (SmallIntVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readShort(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class SmallIntWriterForRow extends 
SmallIntWriter<RowData> {
+        private SmallIntWriterForRow(SmallIntVector smallIntVector) {
+            super(smallIntVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        short readShort(RowData in, int ordinal) {
+            return in.getShort(ordinal);
+        }
+    }
+
+    public static final class SmallIntWriterForArray extends 
SmallIntWriter<ArrayData> {
+        private SmallIntWriterForArray(SmallIntVector smallIntVector) {
+            super(smallIntVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        short readShort(ArrayData in, int ordinal) {
+            return in.getShort(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TinyIntWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TinyIntWriter.java
new file mode 100644
index 00000000..f1995072
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TinyIntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 8-bit integers ({@link TinyIntVector}).
+ *
+ * <p>Use {@link #forRow(TinyIntVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(TinyIntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class TinyIntWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a TinyIntWriter that reads from {@link RowData}. */
+    public static TinyIntWriter<RowData> forRow(TinyIntVector tinyIntVector) {
+        return new TinyIntWriterForRow(tinyIntVector);
+    }
+
+    /** Creates a TinyIntWriter that reads from {@link ArrayData}. */
+    public static TinyIntWriter<ArrayData> forArray(TinyIntVector 
tinyIntVector) {
+        return new TinyIntWriterForArray(tinyIntVector);
+    }
+
+    private TinyIntWriter(TinyIntVector tinyIntVector) {
+        super(tinyIntVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract byte readByte(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        TinyIntVector vector = (TinyIntVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            vector.setSafe(getCount(), readByte(in, ordinal));
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class TinyIntWriterForRow extends 
TinyIntWriter<RowData> {
+        private TinyIntWriterForRow(TinyIntVector tinyIntVector) {
+            super(tinyIntVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        byte readByte(RowData in, int ordinal) {
+            return in.getByte(ordinal);
+        }
+    }
+
+    public static final class TinyIntWriterForArray extends 
TinyIntWriter<ArrayData> {
+        private TinyIntWriterForArray(TinyIntVector tinyIntVector) {
+            super(tinyIntVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        byte readByte(ArrayData in, int ordinal) {
+            return in.getByte(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarBinaryWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarBinaryWriter.java
new file mode 100644
index 00000000..06e9a1a9
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarBinaryWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for variable-length binary data ({@link 
VarBinaryVector}).
+ *
+ * <p>Use {@link #forRow(VarBinaryVector)} when writing from {@link RowData} 
and {@link
+ * #forArray(VarBinaryVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class VarBinaryWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a VarBinaryWriter that reads from {@link RowData}. */
+    public static VarBinaryWriter<RowData> forRow(VarBinaryVector 
varBinaryVector) {
+        return new VarBinaryWriterForRow(varBinaryVector);
+    }
+
+    /** Creates a VarBinaryWriter that reads from {@link ArrayData}. */
+    public static VarBinaryWriter<ArrayData> forArray(VarBinaryVector 
varBinaryVector) {
+        return new VarBinaryWriterForArray(varBinaryVector);
+    }
+
+    private VarBinaryWriter(VarBinaryVector varBinaryVector) {
+        super(varBinaryVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract byte[] readBinary(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        VarBinaryVector vector = (VarBinaryVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            byte[] bytes = readBinary(in, ordinal);
+            vector.setSafe(getCount(), bytes);
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class VarBinaryWriterForRow extends 
VarBinaryWriter<RowData> {
+        private VarBinaryWriterForRow(VarBinaryVector varBinaryVector) {
+            super(varBinaryVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        byte[] readBinary(RowData in, int ordinal) {
+            return in.getBinary(ordinal);
+        }
+    }
+
+    public static final class VarBinaryWriterForArray extends 
VarBinaryWriter<ArrayData> {
+        private VarBinaryWriterForArray(VarBinaryVector varBinaryVector) {
+            super(varBinaryVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        byte[] readBinary(ArrayData in, int ordinal) {
+            return in.getBinary(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarCharWriter.java
 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarCharWriter.java
new file mode 100644
index 00000000..6963e6a6
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarCharWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+/**
+ * {@link ArrowFieldWriter} for variable-length strings ({@link 
VarCharVector}).
+ *
+ * <p>Use {@link #forRow(VarCharVector)} when writing from {@link RowData} and 
{@link
+ * #forArray(VarCharVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class VarCharWriter<T> extends ArrowFieldWriter<T> {
+
+    /** Creates a VarCharWriter that reads from {@link RowData}. */
+    public static VarCharWriter<RowData> forRow(VarCharVector varCharVector) {
+        return new VarCharWriterForRow(varCharVector);
+    }
+
+    /** Creates a VarCharWriter that reads from {@link ArrayData}. */
+    public static VarCharWriter<ArrayData> forArray(VarCharVector 
varCharVector) {
+        return new VarCharWriterForArray(varCharVector);
+    }
+
+    private VarCharWriter(VarCharVector varCharVector) {
+        super(varCharVector);
+    }
+
+    abstract boolean isNullAt(T in, int ordinal);
+
+    abstract StringData readString(T in, int ordinal);
+
+    @Override
+    public void doWrite(T in, int ordinal) {
+        VarCharVector vector = (VarCharVector) getValueVector();
+        if (isNullAt(in, ordinal)) {
+            vector.setNull(getCount());
+        } else {
+            byte[] bytes = readString(in, ordinal).toBytes();
+            vector.setSafe(getCount(), bytes);
+        }
+    }
+
+    // ---- Inner classes for RowData and ArrayData ----
+
+    public static final class VarCharWriterForRow extends 
VarCharWriter<RowData> {
+        private VarCharWriterForRow(VarCharVector varCharVector) {
+            super(varCharVector);
+        }
+
+        @Override
+        boolean isNullAt(RowData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        StringData readString(RowData in, int ordinal) {
+            return in.getString(ordinal);
+        }
+    }
+
+    public static final class VarCharWriterForArray extends 
VarCharWriter<ArrayData> {
+        private VarCharWriterForArray(VarCharVector varCharVector) {
+            super(varCharVector);
+        }
+
+        @Override
+        boolean isNullAt(ArrayData in, int ordinal) {
+            return in.isNullAt(ordinal);
+        }
+
+        @Override
+        StringData readString(ArrayData in, int ordinal) {
+            return in.getString(ordinal);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
new file mode 100644
index 00000000..042a4794
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link FlinkArrowWriter}. */
+public class FlinkArrowWriterTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    /** Writes one row containing all 12 basic types and verifies each vector. 
*/
+    @Test
+    public void testWriteAllBasicTypes() {
+        RowType rowType = RowType.of(
+                new LogicalType[] {
+                    new NullType(),
+                    new BooleanType(),
+                    new TinyIntType(),
+                    new SmallIntType(),
+                    new IntType(),
+                    new BigIntType(),
+                    new FloatType(),
+                    new DoubleType(),
+                    new VarCharType(100),
+                    new VarBinaryType(100),
+                    new DecimalType(10, 2),
+                    new DateType()
+                },
+                new String[] {
+                    "f_null", "f_bool", "f_tinyint", "f_smallint", "f_int", 
"f_bigint",
+                    "f_float", "f_double", "f_varchar", "f_varbinary", 
"f_decimal", "f_date"
+                });
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+            GenericRowData row = new GenericRowData(12);
+            row.setField(0, null);
+            row.setField(1, true);
+            row.setField(2, (byte) 42);
+            row.setField(3, (short) 1000);
+            row.setField(4, 123456);
+            row.setField(5, 9876543210L);
+            row.setField(6, 3.14f);
+            row.setField(7, 2.71828d);
+            row.setField(8, StringData.fromString("hello"));
+            row.setField(9, new byte[] {1, 2, 3});
+            row.setField(10, DecimalData.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2));
+            row.setField(11, 19000); // days since epoch
+
+            writer.write(row);
+            writer.finish();
+
+            assertEquals(1, root.getRowCount());
+
+            // Null
+            assertTrue(((NullVector) root.getVector("f_null")).isNull(0));
+            // Boolean
+            assertEquals(1, ((BitVector) root.getVector("f_bool")).get(0));
+            // TinyInt
+            assertEquals(42, ((TinyIntVector) 
root.getVector("f_tinyint")).get(0));
+            // SmallInt
+            assertEquals(1000, ((SmallIntVector) 
root.getVector("f_smallint")).get(0));
+            // Int
+            assertEquals(123456, ((IntVector) root.getVector("f_int")).get(0));
+            // BigInt
+            assertEquals(9876543210L, ((BigIntVector) 
root.getVector("f_bigint")).get(0));
+            // Float
+            assertEquals(3.14f, ((Float4Vector) 
root.getVector("f_float")).get(0), 0.001f);
+            // Double
+            assertEquals(2.71828d, ((Float8Vector) 
root.getVector("f_double")).get(0), 0.00001d);
+            // VarChar
+            assertEquals(
+                    "hello", new String(((VarCharVector) 
root.getVector("f_varchar")).get(0), StandardCharsets.UTF_8));
+            // VarBinary
+            assertArrayEquals(new byte[] {1, 2, 3}, ((VarBinaryVector) 
root.getVector("f_varbinary")).get(0));
+            // Decimal
+            assertEquals(new BigDecimal("123.45"), ((DecimalVector) 
root.getVector("f_decimal")).getObject(0));
+            // Date
+            assertEquals(19000, ((DateDayVector) 
root.getVector("f_date")).get(0));
+        }
+    }
+
+    /** Writes a row where all nullable fields are null. */
+    @Test
+    public void testWriteNullValues() {
+        RowType rowType = RowType.of(
+                new LogicalType[] {new BooleanType(), new IntType(), new 
VarCharType(100), new DecimalType(10, 2)},
+                new String[] {"f_bool", "f_int", "f_varchar", "f_decimal"});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+            GenericRowData row = new GenericRowData(4);
+            row.setField(0, null);
+            row.setField(1, null);
+            row.setField(2, null);
+            row.setField(3, null);
+
+            writer.write(row);
+            writer.finish();
+
+            assertEquals(1, root.getRowCount());
+            assertTrue(((BitVector) root.getVector("f_bool")).isNull(0));
+            assertTrue(((IntVector) root.getVector("f_int")).isNull(0));
+            assertTrue(((VarCharVector) 
root.getVector("f_varchar")).isNull(0));
+            assertTrue(((DecimalVector) 
root.getVector("f_decimal")).isNull(0));
+        }
+    }
+
+    /** Writes multiple rows, then resets and writes again. */
+    @Test
+    public void testWriteMultipleRowsAndReset() {
+        RowType rowType =
+                RowType.of(new LogicalType[] {new IntType(), new 
VarCharType(100)}, new String[] {"id", "name"});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+            // First batch: 3 rows
+            writer.write(GenericRowData.of(1, StringData.fromString("alice")));
+            writer.write(GenericRowData.of(2, StringData.fromString("bob")));
+            writer.write(GenericRowData.of(3, StringData.fromString("carol")));
+            writer.finish();
+
+            assertEquals(3, root.getRowCount());
+            IntVector idVector = (IntVector) root.getVector("id");
+            VarCharVector nameVector = (VarCharVector) root.getVector("name");
+            assertEquals(1, idVector.get(0));
+            assertEquals(2, idVector.get(1));
+            assertEquals(3, idVector.get(2));
+            assertEquals("alice", new String(nameVector.get(0), 
StandardCharsets.UTF_8));
+            assertEquals("bob", new String(nameVector.get(1), 
StandardCharsets.UTF_8));
+            assertEquals("carol", new String(nameVector.get(2), 
StandardCharsets.UTF_8));
+
+            // Reset and write second batch: 2 rows
+            writer.reset();
+            writer.write(GenericRowData.of(10, StringData.fromString("dave")));
+            writer.write(GenericRowData.of(20, StringData.fromString("eve")));
+            writer.finish();
+
+            assertEquals(2, root.getRowCount());
+            assertEquals(10, idVector.get(0));
+            assertEquals(20, idVector.get(1));
+            assertEquals("dave", new String(nameVector.get(0), 
StandardCharsets.UTF_8));
+            assertEquals("eve", new String(nameVector.get(1), 
StandardCharsets.UTF_8));
+        }
+    }
+
+    /** Finish without writing any rows produces an empty batch. */
+    @Test
+    public void testEmptyBatch() {
+        RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new 
String[] {"id"});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+            writer.finish();
+
+            assertEquals(0, root.getRowCount());
+        }
+    }
+
+    /** Finish on a schema with zero columns produces an empty batch. */
+    @Test
+    public void testEmptyBatchZeroColumns() {
+        RowType rowType = RowType.of(new LogicalType[] {}, new String[] {});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+            writer.finish();
+
+            assertEquals(0, root.getRowCount());
+        }
+    }
+
+    /** Unsupported vector types (e.g., from ArrayType) throw 
UnsupportedOperationException. */
+    @Test
+    public void testUnsupportedTypeThrows() {
+        RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new 
IntType())}, new String[] {"f_array"});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            assertThrows(UnsupportedOperationException.class, () -> 
FlinkArrowWriter.create(root, rowType));
+        }
+    }
+
+    /** The root returned by getRoot() is the same instance passed to 
create(). */
+    @Test
+    public void testGetRoot() {
+        RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new 
String[] {"id"});
+
+        Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+            assertSame(root, writer.getRoot());
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/BasicWritersTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/BasicWritersTest.java
new file mode 100644
index 00000000..38343b4d
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/BasicWritersTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for the basic numeric type writers. */
+public class BasicWritersTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    // ---- NullWriter ----
+
+    @Nested
+    class NullWriterTests {
+
+        private NullVector nullVector;
+
+        @BeforeEach
+        public void setUp() {
+            nullVector = new NullVector("test_null");
+        }
+
+        @AfterEach
+        public void tearDown() {
+            nullVector.close();
+        }
+
+        @Test
+        public void testWriteFromRowData() {
+            NullWriter<RowData> writer = NullWriter.forRow(nullVector);
+
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.finish();
+
+            assertEquals(3, nullVector.getValueCount());
+            assertTrue(nullVector.isNull(0));
+            assertTrue(nullVector.isNull(1));
+            assertTrue(nullVector.isNull(2));
+        }
+
+        @Test
+        public void testWriteFromArrayData() {
+            NullWriter<ArrayData> writer = NullWriter.forArray(nullVector);
+
+            GenericArrayData array = new GenericArrayData(new Object[] {null, 
null});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.finish();
+
+            assertEquals(2, nullVector.getValueCount());
+            assertTrue(nullVector.isNull(0));
+            assertTrue(nullVector.isNull(1));
+        }
+    }
+
+    // ---- BooleanWriter ----
+
+    @Nested
+    class BooleanWriterTests {
+
+        private BitVector bitVector;
+
+        @BeforeEach
+        public void setUp() {
+            bitVector = new BitVector("test_boolean", allocator);
+            bitVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            bitVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            BooleanWriter<RowData> writer = BooleanWriter.forRow(bitVector);
+
+            writer.write(GenericRowData.of(true), 0);
+            writer.write(GenericRowData.of(false), 0);
+            writer.write(GenericRowData.of(true), 0);
+            writer.finish();
+
+            assertEquals(3, bitVector.getValueCount());
+            assertEquals(1, bitVector.get(0));
+            assertEquals(0, bitVector.get(1));
+            assertEquals(1, bitVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            BooleanWriter<RowData> writer = BooleanWriter.forRow(bitVector);
+
+            writer.write(GenericRowData.of(true), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(false), 0);
+            writer.finish();
+
+            assertEquals(3, bitVector.getValueCount());
+            assertFalse(bitVector.isNull(0));
+            assertEquals(1, bitVector.get(0));
+            assertTrue(bitVector.isNull(1));
+            assertFalse(bitVector.isNull(2));
+            assertEquals(0, bitVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            BooleanWriter<ArrayData> writer = 
BooleanWriter.forArray(bitVector);
+
+            GenericArrayData array = new GenericArrayData(new boolean[] {true, 
false, true});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, bitVector.getValueCount());
+            assertEquals(1, bitVector.get(0));
+            assertEquals(0, bitVector.get(1));
+            assertEquals(1, bitVector.get(2));
+        }
+    }
+
+    // ---- TinyIntWriter ----
+
+    @Nested
+    class TinyIntWriterTests {
+
+        private TinyIntVector tinyIntVector;
+
+        @BeforeEach
+        public void setUp() {
+            tinyIntVector = new TinyIntVector("test_tinyint", allocator);
+            tinyIntVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            tinyIntVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            TinyIntWriter<RowData> writer = 
TinyIntWriter.forRow(tinyIntVector);
+
+            writer.write(GenericRowData.of((byte) 1), 0);
+            writer.write(GenericRowData.of((byte) -128), 0);
+            writer.write(GenericRowData.of((byte) 127), 0);
+            writer.finish();
+
+            assertEquals(3, tinyIntVector.getValueCount());
+            assertEquals(1, tinyIntVector.get(0));
+            assertEquals(-128, tinyIntVector.get(1));
+            assertEquals(127, tinyIntVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            TinyIntWriter<RowData> writer = 
TinyIntWriter.forRow(tinyIntVector);
+
+            writer.write(GenericRowData.of((byte) 42), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of((byte) 7), 0);
+            writer.finish();
+
+            assertEquals(3, tinyIntVector.getValueCount());
+            assertFalse(tinyIntVector.isNull(0));
+            assertEquals(42, tinyIntVector.get(0));
+            assertTrue(tinyIntVector.isNull(1));
+            assertFalse(tinyIntVector.isNull(2));
+            assertEquals(7, tinyIntVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            TinyIntWriter<ArrayData> writer = 
TinyIntWriter.forArray(tinyIntVector);
+
+            GenericArrayData array = new GenericArrayData(new byte[] {10, 20, 
30});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, tinyIntVector.getValueCount());
+            assertEquals(10, tinyIntVector.get(0));
+            assertEquals(20, tinyIntVector.get(1));
+            assertEquals(30, tinyIntVector.get(2));
+        }
+    }
+
+    // ---- SmallIntWriter ----
+
+    @Nested
+    class SmallIntWriterTests {
+
+        private SmallIntVector smallIntVector;
+
+        @BeforeEach
+        public void setUp() {
+            smallIntVector = new SmallIntVector("test_smallint", allocator);
+            smallIntVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            smallIntVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            SmallIntWriter<RowData> writer = 
SmallIntWriter.forRow(smallIntVector);
+
+            writer.write(GenericRowData.of((short) 100), 0);
+            writer.write(GenericRowData.of((short) -32768), 0);
+            writer.write(GenericRowData.of((short) 32767), 0);
+            writer.finish();
+
+            assertEquals(3, smallIntVector.getValueCount());
+            assertEquals(100, smallIntVector.get(0));
+            assertEquals(-32768, smallIntVector.get(1));
+            assertEquals(32767, smallIntVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            SmallIntWriter<RowData> writer = 
SmallIntWriter.forRow(smallIntVector);
+
+            writer.write(GenericRowData.of((short) 42), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of((short) 99), 0);
+            writer.finish();
+
+            assertEquals(3, smallIntVector.getValueCount());
+            assertFalse(smallIntVector.isNull(0));
+            assertEquals(42, smallIntVector.get(0));
+            assertTrue(smallIntVector.isNull(1));
+            assertFalse(smallIntVector.isNull(2));
+            assertEquals(99, smallIntVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            SmallIntWriter<ArrayData> writer = 
SmallIntWriter.forArray(smallIntVector);
+
+            GenericArrayData array = new GenericArrayData(new short[] {10, 20, 
30});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, smallIntVector.getValueCount());
+            assertEquals(10, smallIntVector.get(0));
+            assertEquals(20, smallIntVector.get(1));
+            assertEquals(30, smallIntVector.get(2));
+        }
+    }
+
+    // ---- BigIntWriter ----
+
+    @Nested
+    class BigIntWriterTests {
+
+        private BigIntVector bigIntVector;
+
+        @BeforeEach
+        public void setUp() {
+            bigIntVector = new BigIntVector("test_bigint", allocator);
+            bigIntVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            bigIntVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            BigIntWriter<RowData> writer = BigIntWriter.forRow(bigIntVector);
+
+            writer.write(GenericRowData.of(100L), 0);
+            writer.write(GenericRowData.of(Long.MIN_VALUE), 0);
+            writer.write(GenericRowData.of(Long.MAX_VALUE), 0);
+            writer.finish();
+
+            assertEquals(3, bigIntVector.getValueCount());
+            assertEquals(100L, bigIntVector.get(0));
+            assertEquals(Long.MIN_VALUE, bigIntVector.get(1));
+            assertEquals(Long.MAX_VALUE, bigIntVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            BigIntWriter<RowData> writer = BigIntWriter.forRow(bigIntVector);
+
+            writer.write(GenericRowData.of(42L), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(99L), 0);
+            writer.finish();
+
+            assertEquals(3, bigIntVector.getValueCount());
+            assertFalse(bigIntVector.isNull(0));
+            assertEquals(42L, bigIntVector.get(0));
+            assertTrue(bigIntVector.isNull(1));
+            assertFalse(bigIntVector.isNull(2));
+            assertEquals(99L, bigIntVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            BigIntWriter<ArrayData> writer = 
BigIntWriter.forArray(bigIntVector);
+
+            GenericArrayData array = new GenericArrayData(new long[] {10L, 
20L, 30L});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, bigIntVector.getValueCount());
+            assertEquals(10L, bigIntVector.get(0));
+            assertEquals(20L, bigIntVector.get(1));
+            assertEquals(30L, bigIntVector.get(2));
+        }
+    }
+
+    // ---- FloatWriter ----
+
+    @Nested
+    class FloatWriterTests {
+
+        private Float4Vector float4Vector;
+
+        @BeforeEach
+        public void setUp() {
+            float4Vector = new Float4Vector("test_float", allocator);
+            float4Vector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            float4Vector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            FloatWriter<RowData> writer = FloatWriter.forRow(float4Vector);
+
+            writer.write(GenericRowData.of(1.5f), 0);
+            writer.write(GenericRowData.of(-3.14f), 0);
+            writer.write(GenericRowData.of(0.0f), 0);
+            writer.finish();
+
+            assertEquals(3, float4Vector.getValueCount());
+            assertEquals(1.5f, float4Vector.get(0), 0.0001f);
+            assertEquals(-3.14f, float4Vector.get(1), 0.0001f);
+            assertEquals(0.0f, float4Vector.get(2), 0.0001f);
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            FloatWriter<RowData> writer = FloatWriter.forRow(float4Vector);
+
+            writer.write(GenericRowData.of(1.5f), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(2.5f), 0);
+            writer.finish();
+
+            assertEquals(3, float4Vector.getValueCount());
+            assertFalse(float4Vector.isNull(0));
+            assertEquals(1.5f, float4Vector.get(0), 0.0001f);
+            assertTrue(float4Vector.isNull(1));
+            assertFalse(float4Vector.isNull(2));
+            assertEquals(2.5f, float4Vector.get(2), 0.0001f);
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            FloatWriter<ArrayData> writer = FloatWriter.forArray(float4Vector);
+
+            GenericArrayData array = new GenericArrayData(new float[] {1.0f, 
2.0f, 3.0f});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, float4Vector.getValueCount());
+            assertEquals(1.0f, float4Vector.get(0), 0.0001f);
+            assertEquals(2.0f, float4Vector.get(1), 0.0001f);
+            assertEquals(3.0f, float4Vector.get(2), 0.0001f);
+        }
+    }
+
+    // ---- DoubleWriter ----
+
+    @Nested
+    class DoubleWriterTests {
+
+        private Float8Vector float8Vector;
+
+        @BeforeEach
+        public void setUp() {
+            float8Vector = new Float8Vector("test_double", allocator);
+            float8Vector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            float8Vector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            DoubleWriter<RowData> writer = DoubleWriter.forRow(float8Vector);
+
+            writer.write(GenericRowData.of(1.5d), 0);
+            writer.write(GenericRowData.of(-3.14d), 0);
+            writer.write(GenericRowData.of(0.0d), 0);
+            writer.finish();
+
+            assertEquals(3, float8Vector.getValueCount());
+            assertEquals(1.5d, float8Vector.get(0), 0.0001d);
+            assertEquals(-3.14d, float8Vector.get(1), 0.0001d);
+            assertEquals(0.0d, float8Vector.get(2), 0.0001d);
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            DoubleWriter<RowData> writer = DoubleWriter.forRow(float8Vector);
+
+            writer.write(GenericRowData.of(1.5d), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(2.5d), 0);
+            writer.finish();
+
+            assertEquals(3, float8Vector.getValueCount());
+            assertFalse(float8Vector.isNull(0));
+            assertEquals(1.5d, float8Vector.get(0), 0.0001d);
+            assertTrue(float8Vector.isNull(1));
+            assertFalse(float8Vector.isNull(2));
+            assertEquals(2.5d, float8Vector.get(2), 0.0001d);
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            DoubleWriter<ArrayData> writer = 
DoubleWriter.forArray(float8Vector);
+
+            GenericArrayData array = new GenericArrayData(new double[] {1.0d, 
2.0d, 3.0d});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, float8Vector.getValueCount());
+            assertEquals(1.0d, float8Vector.get(0), 0.0001d);
+            assertEquals(2.0d, float8Vector.get(1), 0.0001d);
+            assertEquals(3.0d, float8Vector.get(2), 0.0001d);
+        }
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/IntWriterTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/IntWriterTest.java
new file mode 100644
index 00000000..9280f0b2
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/IntWriterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link IntWriter}. */
+public class IntWriterTest {
+
+    private BufferAllocator allocator;
+    private IntVector intVector;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+        intVector = new IntVector("test_int", allocator);
+        intVector.allocateNew();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        intVector.close();
+        allocator.close();
+    }
+
+    @Test
+    public void testWriteValuesFromRowData() {
+        IntWriter<RowData> writer = IntWriter.forRow(intVector);
+
+        GenericRowData row0 = GenericRowData.of(42);
+        GenericRowData row1 = GenericRowData.of(100);
+        GenericRowData row2 = GenericRowData.of(-7);
+
+        writer.write(row0, 0);
+        writer.write(row1, 0);
+        writer.write(row2, 0);
+        writer.finish();
+
+        assertEquals(3, intVector.getValueCount());
+        assertEquals(42, intVector.get(0));
+        assertEquals(100, intVector.get(1));
+        assertEquals(-7, intVector.get(2));
+    }
+
+    @Test
+    public void testWriteNullFromRowData() {
+        IntWriter<RowData> writer = IntWriter.forRow(intVector);
+
+        GenericRowData row0 = GenericRowData.of(42);
+        GenericRowData row1 = GenericRowData.of((Object) null);
+        GenericRowData row2 = GenericRowData.of(99);
+
+        writer.write(row0, 0);
+        writer.write(row1, 0);
+        writer.write(row2, 0);
+        writer.finish();
+
+        assertEquals(3, intVector.getValueCount());
+        assertFalse(intVector.isNull(0));
+        assertEquals(42, intVector.get(0));
+        assertTrue(intVector.isNull(1));
+        assertFalse(intVector.isNull(2));
+        assertEquals(99, intVector.get(2));
+    }
+
+    @Test
+    public void testWriteValuesFromArrayData() {
+        IntWriter<ArrayData> writer = IntWriter.forArray(intVector);
+
+        GenericArrayData array = new GenericArrayData(new int[] {10, 20, 30});
+
+        writer.write(array, 0);
+        writer.write(array, 1);
+        writer.write(array, 2);
+        writer.finish();
+
+        assertEquals(3, intVector.getValueCount());
+        assertEquals(10, intVector.get(0));
+        assertEquals(20, intVector.get(1));
+        assertEquals(30, intVector.get(2));
+    }
+
+    @Test
+    public void testWriteNullFromArrayData() {
+        IntWriter<ArrayData> writer = IntWriter.forArray(intVector);
+
+        GenericArrayData array = new GenericArrayData(new Integer[] {10, null, 
30});
+
+        writer.write(array, 0);
+        writer.write(array, 1);
+        writer.write(array, 2);
+        writer.finish();
+
+        assertEquals(3, intVector.getValueCount());
+        assertEquals(10, intVector.get(0));
+        assertTrue(intVector.isNull(1));
+        assertEquals(30, intVector.get(2));
+    }
+
+    @Test
+    public void testResetAndWriteNewBatch() {
+        IntWriter<RowData> writer = IntWriter.forRow(intVector);
+
+        // First batch
+        writer.write(GenericRowData.of(1), 0);
+        writer.write(GenericRowData.of(2), 0);
+        writer.finish();
+
+        assertEquals(2, intVector.getValueCount());
+        assertEquals(1, intVector.get(0));
+        assertEquals(2, intVector.get(1));
+
+        // Reset
+        writer.reset();
+
+        assertEquals(0, intVector.getValueCount());
+
+        // Second batch
+        writer.write(GenericRowData.of(100), 0);
+        writer.finish();
+
+        assertEquals(1, intVector.getValueCount());
+        assertEquals(100, intVector.get(0));
+    }
+}
diff --git 
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/NonNumericWritersTest.java
 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/NonNumericWritersTest.java
new file mode 100644
index 00000000..0f9cb425
--- /dev/null
+++ 
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/NonNumericWritersTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for the non-numeric type writers. */
+public class NonNumericWritersTest {
+
+    private BufferAllocator allocator;
+
+    @BeforeEach
+    public void setUp() {
+        allocator = new RootAllocator(Long.MAX_VALUE);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        allocator.close();
+    }
+
+    // ---- VarCharWriter ----
+
+    @Nested
+    class VarCharWriterTests {
+
+        private VarCharVector varCharVector;
+
+        @BeforeEach
+        public void setUp() {
+            varCharVector = new VarCharVector("test_varchar", allocator);
+            varCharVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            varCharVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            VarCharWriter<RowData> writer = 
VarCharWriter.forRow(varCharVector);
+
+            writer.write(GenericRowData.of(StringData.fromString("hello")), 0);
+            writer.write(GenericRowData.of(StringData.fromString("world")), 0);
+            writer.write(GenericRowData.of(StringData.fromString("")), 0);
+            writer.finish();
+
+            assertEquals(3, varCharVector.getValueCount());
+            assertArrayEquals("hello".getBytes(), varCharVector.get(0));
+            assertArrayEquals("world".getBytes(), varCharVector.get(1));
+            assertArrayEquals("".getBytes(), varCharVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            VarCharWriter<RowData> writer = 
VarCharWriter.forRow(varCharVector);
+
+            writer.write(GenericRowData.of(StringData.fromString("hello")), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(StringData.fromString("world")), 0);
+            writer.finish();
+
+            assertEquals(3, varCharVector.getValueCount());
+            assertFalse(varCharVector.isNull(0));
+            assertArrayEquals("hello".getBytes(), varCharVector.get(0));
+            assertTrue(varCharVector.isNull(1));
+            assertFalse(varCharVector.isNull(2));
+            assertArrayEquals("world".getBytes(), varCharVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            VarCharWriter<ArrayData> writer = 
VarCharWriter.forArray(varCharVector);
+
+            GenericArrayData array = new GenericArrayData(new StringData[] {
+                StringData.fromString("foo"), StringData.fromString("bar"), 
StringData.fromString("baz")
+            });
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, varCharVector.getValueCount());
+            assertArrayEquals("foo".getBytes(), varCharVector.get(0));
+            assertArrayEquals("bar".getBytes(), varCharVector.get(1));
+            assertArrayEquals("baz".getBytes(), varCharVector.get(2));
+        }
+    }
+
+    // ---- VarBinaryWriter ----
+
+    @Nested
+    class VarBinaryWriterTests {
+
+        private VarBinaryVector varBinaryVector;
+
+        @BeforeEach
+        public void setUp() {
+            varBinaryVector = new VarBinaryVector("test_varbinary", allocator);
+            varBinaryVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            varBinaryVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            VarBinaryWriter<RowData> writer = 
VarBinaryWriter.forRow(varBinaryVector);
+
+            writer.write(GenericRowData.of(new byte[] {1, 2, 3}), 0);
+            writer.write(GenericRowData.of(new byte[] {4, 5}), 0);
+            writer.write(GenericRowData.of(new byte[] {}), 0);
+            writer.finish();
+
+            assertEquals(3, varBinaryVector.getValueCount());
+            assertArrayEquals(new byte[] {1, 2, 3}, varBinaryVector.get(0));
+            assertArrayEquals(new byte[] {4, 5}, varBinaryVector.get(1));
+            assertArrayEquals(new byte[] {}, varBinaryVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            VarBinaryWriter<RowData> writer = 
VarBinaryWriter.forRow(varBinaryVector);
+
+            writer.write(GenericRowData.of(new byte[] {1, 2, 3}), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(new byte[] {7, 8}), 0);
+            writer.finish();
+
+            assertEquals(3, varBinaryVector.getValueCount());
+            assertFalse(varBinaryVector.isNull(0));
+            assertArrayEquals(new byte[] {1, 2, 3}, varBinaryVector.get(0));
+            assertTrue(varBinaryVector.isNull(1));
+            assertFalse(varBinaryVector.isNull(2));
+            assertArrayEquals(new byte[] {7, 8}, varBinaryVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            VarBinaryWriter<ArrayData> writer = 
VarBinaryWriter.forArray(varBinaryVector);
+
+            GenericArrayData array =
+                    new GenericArrayData(new byte[][] {new byte[] {10, 20}, 
new byte[] {30, 40}, new byte[] {50}});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, varBinaryVector.getValueCount());
+            assertArrayEquals(new byte[] {10, 20}, varBinaryVector.get(0));
+            assertArrayEquals(new byte[] {30, 40}, varBinaryVector.get(1));
+            assertArrayEquals(new byte[] {50}, varBinaryVector.get(2));
+        }
+    }
+
+    // ---- DecimalWriter ----
+
+    @Nested
+    class DecimalWriterTests {
+
+        private DecimalVector decimalVector;
+
+        @BeforeEach
+        public void setUp() {
+            decimalVector = new DecimalVector("test_decimal", allocator, 10, 
2);
+            decimalVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            decimalVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            DecimalWriter<RowData> writer = 
DecimalWriter.forRow(decimalVector, 10, 2);
+
+            writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2)), 0);
+            writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("-678.90"), 10, 2)), 0);
+            writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("0.00"), 10, 2)), 0);
+            writer.finish();
+
+            assertEquals(3, decimalVector.getValueCount());
+            assertEquals(new BigDecimal("123.45"), decimalVector.getObject(0));
+            assertEquals(new BigDecimal("-678.90"), 
decimalVector.getObject(1));
+            assertEquals(new BigDecimal("0.00"), decimalVector.getObject(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            DecimalWriter<RowData> writer = 
DecimalWriter.forRow(decimalVector, 10, 2);
+
+            writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2)), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new 
BigDecimal("99.99"), 10, 2)), 0);
+            writer.finish();
+
+            assertEquals(3, decimalVector.getValueCount());
+            assertFalse(decimalVector.isNull(0));
+            assertEquals(new BigDecimal("123.45"), decimalVector.getObject(0));
+            assertTrue(decimalVector.isNull(1));
+            assertFalse(decimalVector.isNull(2));
+            assertEquals(new BigDecimal("99.99"), decimalVector.getObject(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            DecimalWriter<ArrayData> writer = 
DecimalWriter.forArray(decimalVector, 10, 2);
+
+            GenericArrayData array = new GenericArrayData(new DecimalData[] {
+                DecimalData.fromBigDecimal(new BigDecimal("1.11"), 10, 2),
+                DecimalData.fromBigDecimal(new BigDecimal("2.22"), 10, 2),
+                DecimalData.fromBigDecimal(new BigDecimal("3.33"), 10, 2)
+            });
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, decimalVector.getValueCount());
+            assertEquals(new BigDecimal("1.11"), decimalVector.getObject(0));
+            assertEquals(new BigDecimal("2.22"), decimalVector.getObject(1));
+            assertEquals(new BigDecimal("3.33"), decimalVector.getObject(2));
+        }
+    }
+
+    // ---- DateWriter ----
+
+    @Nested
+    class DateWriterTests {
+
+        private DateDayVector dateDayVector;
+
+        @BeforeEach
+        public void setUp() {
+            dateDayVector = new DateDayVector("test_date", allocator);
+            dateDayVector.allocateNew();
+        }
+
+        @AfterEach
+        public void tearDown() {
+            dateDayVector.close();
+        }
+
+        @Test
+        public void testWriteValuesFromRowData() {
+            DateWriter<RowData> writer = DateWriter.forRow(dateDayVector);
+
+            // 19000 days since epoch = ~2022-01-06
+            writer.write(GenericRowData.of(19000), 0);
+            writer.write(GenericRowData.of(0), 0);
+            writer.write(GenericRowData.of(18628), 0);
+            writer.finish();
+
+            assertEquals(3, dateDayVector.getValueCount());
+            assertEquals(19000, dateDayVector.get(0));
+            assertEquals(0, dateDayVector.get(1));
+            assertEquals(18628, dateDayVector.get(2));
+        }
+
+        @Test
+        public void testWriteNullFromRowData() {
+            DateWriter<RowData> writer = DateWriter.forRow(dateDayVector);
+
+            writer.write(GenericRowData.of(19000), 0);
+            writer.write(GenericRowData.of((Object) null), 0);
+            writer.write(GenericRowData.of(18628), 0);
+            writer.finish();
+
+            assertEquals(3, dateDayVector.getValueCount());
+            assertFalse(dateDayVector.isNull(0));
+            assertEquals(19000, dateDayVector.get(0));
+            assertTrue(dateDayVector.isNull(1));
+            assertFalse(dateDayVector.isNull(2));
+            assertEquals(18628, dateDayVector.get(2));
+        }
+
+        @Test
+        public void testWriteValuesFromArrayData() {
+            DateWriter<ArrayData> writer = DateWriter.forArray(dateDayVector);
+
+            GenericArrayData array = new GenericArrayData(new int[] {19000, 0, 
18628});
+
+            writer.write(array, 0);
+            writer.write(array, 1);
+            writer.write(array, 2);
+            writer.finish();
+
+            assertEquals(3, dateDayVector.getValueCount());
+            assertEquals(19000, dateDayVector.get(0));
+            assertEquals(0, dateDayVector.get(1));
+            assertEquals(18628, dateDayVector.get(2));
+        }
+    }
+}

Reply via email to