This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 64de43f3 [AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for
basic types (#2079)
64de43f3 is described below
commit 64de43f3ba3818aa387ec422d5580165e3633a8f
Author: xTong <[email protected]>
AuthorDate: Tue Mar 10 17:21:46 2026 +0800
[AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for basic types
(#2079)
# Which issue does this PR close?
Partially addresses #1850 (Part 2a of the Flink RowData to Arrow
conversion).
# Rationale for this change
Per AIP-1, the Flink integration data path requires converting Flink
`RowData` into Arrow `VectorSchemaRoot` for export to the native engine
(DataFusion/Rust). This PR implements the writer layer for basic types,
following Flink's official `flink-python` Arrow implementation as
requested during Part 1 review (#1959).
# What changes are included in this PR?
## Commit 1: ArrowFieldWriter base class + 12 type writers (16 files,
+2181 lines)
- **`ArrowFieldWriter<IN>`** — Generic abstract base class using
template method pattern (`write()` → `doWrite()` + count++), aligned
with Flink's `flink-python` `ArrowFieldWriter`.
- **12 concrete writers** in `writers/` sub-package, each with
`forRow()`/`forArray()` dual-mode factory methods:
- Numeric: `IntWriter`, `TinyIntWriter`, `SmallIntWriter`,
`BigIntWriter`, `FloatWriter`, `DoubleWriter`
- Non-numeric: `BooleanWriter`, `VarCharWriter`, `VarBinaryWriter`,
`DecimalWriter`, `DateWriter`, `NullWriter`
- **Key design**: Each writer (except `NullWriter`) has two `public
static final` inner classes (`XxxWriterForRow` / `XxxWriterForArray`)
because Flink's `RowData` and `ArrayData` have no common getter
interface.
- **Special cases**:
- `NullWriter`: No inner classes needed, `doWrite()` is empty
(NullVector values are inherently null)
- `DecimalWriter`: Takes precision/scale parameters, includes
`fitBigDecimal()` validation before writing (aligned with Flink's
`fromBigDecimal` logic)
- **Unit tests**: `IntWriterTest` (5), `BasicWritersTest` (20),
`NonNumericWritersTest` (12) — 37 tests
## Commit 2: FlinkArrowWriter orchestrator + factory methods (3 files,
+482 lines)
- **`FlinkArrowWriter`** — Orchestrates per-column
`ArrowFieldWriter<RowData>[]` to write Flink `RowData` into Arrow
`VectorSchemaRoot`. Lifecycle: `create()` → `write(row)*` → `finish()` →
`reset()`.
- **Factory methods in `FlinkArrowUtils`** —
`createArrowFieldWriterForRow()`/`createArrowFieldWriterForArray()`
dispatch writer creation based on Arrow vector type (instanceof chain).
Both are package-private.
- **Integration tests**: `FlinkArrowWriterTest` (7) — all-types write,
null handling, multi-row batches, reset, empty batch, zero columns,
unsupported type. Total: **53 tests, all passing**.
# Scope
This PR covers basic types only. Time, Timestamp, and complex types
(Array/Map/Row) will be in Part 2b.
# Are there any user-facing changes?
No. Internal API for Flink integration.
# How was this patch tested?
53 tests across 4 test classes:
```bash
./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
-pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
```
Result: 53 pass, 0 failures.
---
.../apache/auron/flink/arrow/FlinkArrowUtils.java | 108 +++++
.../apache/auron/flink/arrow/FlinkArrowWriter.java | 97 ++++
.../flink/arrow/writers/ArrowFieldWriter.java | 79 ++++
.../auron/flink/arrow/writers/BigIntWriter.java | 94 ++++
.../auron/flink/arrow/writers/BooleanWriter.java | 94 ++++
.../auron/flink/arrow/writers/DateWriter.java | 94 ++++
.../auron/flink/arrow/writers/DecimalWriter.java | 116 +++++
.../auron/flink/arrow/writers/DoubleWriter.java | 94 ++++
.../auron/flink/arrow/writers/FloatWriter.java | 94 ++++
.../auron/flink/arrow/writers/IntWriter.java | 94 ++++
.../auron/flink/arrow/writers/NullWriter.java | 51 +++
.../auron/flink/arrow/writers/SmallIntWriter.java | 94 ++++
.../auron/flink/arrow/writers/TinyIntWriter.java | 94 ++++
.../auron/flink/arrow/writers/VarBinaryWriter.java | 95 ++++
.../auron/flink/arrow/writers/VarCharWriter.java | 96 ++++
.../auron/flink/arrow/FlinkArrowWriterTest.java | 277 +++++++++++
.../flink/arrow/writers/BasicWritersTest.java | 509 +++++++++++++++++++++
.../auron/flink/arrow/writers/IntWriterTest.java | 150 ++++++
.../flink/arrow/writers/NonNumericWritersTest.java | 333 ++++++++++++++
19 files changed, 2663 insertions(+)
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
index 0763a847..273c66c0 100644
---
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -20,6 +20,19 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
@@ -28,6 +41,21 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.flink.arrow.writers.ArrowFieldWriter;
+import org.apache.auron.flink.arrow.writers.BigIntWriter;
+import org.apache.auron.flink.arrow.writers.BooleanWriter;
+import org.apache.auron.flink.arrow.writers.DateWriter;
+import org.apache.auron.flink.arrow.writers.DecimalWriter;
+import org.apache.auron.flink.arrow.writers.DoubleWriter;
+import org.apache.auron.flink.arrow.writers.FloatWriter;
+import org.apache.auron.flink.arrow.writers.IntWriter;
+import org.apache.auron.flink.arrow.writers.NullWriter;
+import org.apache.auron.flink.arrow.writers.SmallIntWriter;
+import org.apache.auron.flink.arrow.writers.TinyIntWriter;
+import org.apache.auron.flink.arrow.writers.VarBinaryWriter;
+import org.apache.auron.flink.arrow.writers.VarCharWriter;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
@@ -217,6 +245,86 @@ public final class FlinkArrowUtils {
return new Schema(fields);
}
+ /**
+ * Creates an {@link ArrowFieldWriter} for top-level {@link RowData}
fields.
+ *
+ * @param vector the Arrow vector to write into
+ * @param fieldType the Flink logical type of the field
+ * @return a writer that reads from RowData at a given ordinal
+ * @throws UnsupportedOperationException if the vector type is not
supported
+ */
+ static ArrowFieldWriter<RowData> createArrowFieldWriterForRow(ValueVector
vector, LogicalType fieldType) {
+ if (vector instanceof NullVector) {
+ return NullWriter.forRow((NullVector) vector);
+ } else if (vector instanceof BitVector) {
+ return BooleanWriter.forRow((BitVector) vector);
+ } else if (vector instanceof TinyIntVector) {
+ return TinyIntWriter.forRow((TinyIntVector) vector);
+ } else if (vector instanceof SmallIntVector) {
+ return SmallIntWriter.forRow((SmallIntVector) vector);
+ } else if (vector instanceof IntVector) {
+ return IntWriter.forRow((IntVector) vector);
+ } else if (vector instanceof BigIntVector) {
+ return BigIntWriter.forRow((BigIntVector) vector);
+ } else if (vector instanceof Float4Vector) {
+ return FloatWriter.forRow((Float4Vector) vector);
+ } else if (vector instanceof Float8Vector) {
+ return DoubleWriter.forRow((Float8Vector) vector);
+ } else if (vector instanceof VarCharVector) {
+ return VarCharWriter.forRow((VarCharVector) vector);
+ } else if (vector instanceof VarBinaryVector) {
+ return VarBinaryWriter.forRow((VarBinaryVector) vector);
+ } else if (vector instanceof DecimalVector) {
+ DecimalType decimalType = (DecimalType) fieldType;
+ return DecimalWriter.forRow((DecimalVector) vector,
decimalType.getPrecision(), decimalType.getScale());
+ } else if (vector instanceof DateDayVector) {
+ return DateWriter.forRow((DateDayVector) vector);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported vector type: " +
vector.getClass().getSimpleName());
+ }
+ }
+
+ /**
+ * Creates an {@link ArrowFieldWriter} for nested {@link ArrayData}
elements.
+ *
+ * @param vector the Arrow vector to write into
+ * @param fieldType the Flink logical type of the element
+ * @return a writer that reads from ArrayData at a given ordinal
+ * @throws UnsupportedOperationException if the vector type is not
supported
+ */
+ static ArrowFieldWriter<ArrayData>
createArrowFieldWriterForArray(ValueVector vector, LogicalType fieldType) {
+ if (vector instanceof NullVector) {
+ return NullWriter.forArray((NullVector) vector);
+ } else if (vector instanceof BitVector) {
+ return BooleanWriter.forArray((BitVector) vector);
+ } else if (vector instanceof TinyIntVector) {
+ return TinyIntWriter.forArray((TinyIntVector) vector);
+ } else if (vector instanceof SmallIntVector) {
+ return SmallIntWriter.forArray((SmallIntVector) vector);
+ } else if (vector instanceof IntVector) {
+ return IntWriter.forArray((IntVector) vector);
+ } else if (vector instanceof BigIntVector) {
+ return BigIntWriter.forArray((BigIntVector) vector);
+ } else if (vector instanceof Float4Vector) {
+ return FloatWriter.forArray((Float4Vector) vector);
+ } else if (vector instanceof Float8Vector) {
+ return DoubleWriter.forArray((Float8Vector) vector);
+ } else if (vector instanceof VarCharVector) {
+ return VarCharWriter.forArray((VarCharVector) vector);
+ } else if (vector instanceof VarBinaryVector) {
+ return VarBinaryWriter.forArray((VarBinaryVector) vector);
+ } else if (vector instanceof DecimalVector) {
+ DecimalType decimalType = (DecimalType) fieldType;
+ return DecimalWriter.forArray((DecimalVector) vector,
decimalType.getPrecision(), decimalType.getScale());
+ } else if (vector instanceof DateDayVector) {
+ return DateWriter.forArray((DateDayVector) vector);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported vector type: " +
vector.getClass().getSimpleName());
+ }
+ }
+
private FlinkArrowUtils() {
// Utility class
}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java
new file mode 100644
index 00000000..a339fb7c
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.auron.flink.arrow.writers.ArrowFieldWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Orchestrates per-column {@link ArrowFieldWriter}s to write {@link RowData}
into an Arrow {@link
+ * VectorSchemaRoot}.
+ *
+ * <p>Use {@link #create(VectorSchemaRoot, RowType)} to construct, then call
{@link #write(RowData)}
+ * for each row, {@link #finish()} to finalize the batch, and {@link #reset()}
before starting a new
+ * batch.
+ */
+public final class FlinkArrowWriter {
+
+ private final VectorSchemaRoot root;
+ private final ArrowFieldWriter<RowData>[] fieldWriters;
+
+ @SuppressWarnings("unchecked")
+ private FlinkArrowWriter(VectorSchemaRoot root,
ArrowFieldWriter<RowData>[] fieldWriters) {
+ this.root = root;
+ this.fieldWriters = fieldWriters;
+ }
+
+ /**
+ * Creates a writer from an existing {@link VectorSchemaRoot} and Flink
{@link RowType}.
+ *
+ * <p>Each vector in the root is allocated and a matching {@link
ArrowFieldWriter} is created via
+ * {@link FlinkArrowUtils#createArrowFieldWriterForRow}.
+ *
+ * @param root the Arrow vector schema root to write into
+ * @param rowType the Flink row type describing the schema
+ * @return a new writer instance
+ */
+ @SuppressWarnings("unchecked")
+ public static FlinkArrowWriter create(VectorSchemaRoot root, RowType
rowType) {
+ ArrowFieldWriter<RowData>[] fieldWriters =
+ new ArrowFieldWriter[root.getFieldVectors().size()];
+ for (int i = 0; i < fieldWriters.length; i++) {
+ FieldVector vector = root.getFieldVectors().get(i);
+ vector.allocateNew();
+ fieldWriters[i] =
FlinkArrowUtils.createArrowFieldWriterForRow(vector, rowType.getTypeAt(i));
+ }
+ return new FlinkArrowWriter(root, fieldWriters);
+ }
+
+ /**
+ * Writes a single {@link RowData} into the underlying Arrow vectors.
+ *
+ * @param row the row to write
+ */
+ public void write(RowData row) {
+ for (int i = 0; i < fieldWriters.length; i++) {
+ fieldWriters[i].write(row, i);
+ }
+ }
+
+ /** Finalizes the current batch by setting the row count and finishing
each field writer. */
+ public void finish() {
+ root.setRowCount(fieldWriters.length > 0 ? fieldWriters[0].getCount()
: 0);
+ for (ArrowFieldWriter<RowData> w : fieldWriters) {
+ w.finish();
+ }
+ }
+
+ /** Resets all vectors and field writers for a new batch. */
+ public void reset() {
+ root.setRowCount(0);
+ for (ArrowFieldWriter<RowData> w : fieldWriters) {
+ w.reset();
+ }
+ }
+
+ /** Returns the underlying {@link VectorSchemaRoot}. */
+ public VectorSchemaRoot getRoot() {
+ return root;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrowFieldWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrowFieldWriter.java
new file mode 100644
index 00000000..77122a37
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/ArrowFieldWriter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.util.Objects;
+import org.apache.arrow.vector.ValueVector;
+
+/**
+ * Base class for writing Flink data into Arrow vectors.
+ *
+ * <p>Uses a template method pattern: {@link #write(Object, int)} increments
the row count and
+ * delegates to {@link #doWrite(Object, int)} for type-specific logic.
+ *
+ * @param <IN> the input data type (e.g., RowData or ArrayData)
+ */
+public abstract class ArrowFieldWriter<IN> {
+
+ private final ValueVector valueVector;
+ private int count;
+
+ protected ArrowFieldWriter(ValueVector valueVector) {
+ this.valueVector = Objects.requireNonNull(valueVector, "valueVector
must not be null");
+ }
+
+ /** Returns the underlying Arrow vector. */
+ public ValueVector getValueVector() {
+ return valueVector;
+ }
+
+ /** Returns the number of values written since the last reset. */
+ public int getCount() {
+ return count;
+ }
+
+ /**
+ * Writes a value from the input at the given ordinal position.
+ *
+ * @param row the input data
+ * @param ordinal the field index within the input
+ */
+ public void write(IN row, int ordinal) {
+ doWrite(row, ordinal);
+ count++;
+ }
+
+ /**
+ * Type-specific write logic. Implementations read from the input and
write into the Arrow
+ * vector at position {@link #getCount()}.
+ *
+ * @param row the input data
+ * @param ordinal the field index within the input
+ */
+ public abstract void doWrite(IN row, int ordinal);
+
+ /** Finalizes the current batch by setting the value count on the vector.
*/
+ public void finish() {
+ valueVector.setValueCount(count);
+ }
+
+ /** Resets the vector and count for a new batch. */
+ public void reset() {
+ valueVector.reset();
+ count = 0;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BigIntWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BigIntWriter.java
new file mode 100644
index 00000000..b2b67f9f
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BigIntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 64-bit integers ({@link BigIntVector}).
+ *
+ * <p>Use {@link #forRow(BigIntVector)} when writing from {@link RowData} and
{@link
+ * #forArray(BigIntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class BigIntWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a BigIntWriter that reads from {@link RowData}. */
+ public static BigIntWriter<RowData> forRow(BigIntVector bigIntVector) {
+ return new BigIntWriterForRow(bigIntVector);
+ }
+
+ /** Creates a BigIntWriter that reads from {@link ArrayData}. */
+ public static BigIntWriter<ArrayData> forArray(BigIntVector bigIntVector) {
+ return new BigIntWriterForArray(bigIntVector);
+ }
+
+ private BigIntWriter(BigIntVector bigIntVector) {
+ super(bigIntVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract long readLong(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ BigIntVector vector = (BigIntVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readLong(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class BigIntWriterForRow extends BigIntWriter<RowData>
{
+ private BigIntWriterForRow(BigIntVector bigIntVector) {
+ super(bigIntVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ long readLong(RowData in, int ordinal) {
+ return in.getLong(ordinal);
+ }
+ }
+
+ public static final class BigIntWriterForArray extends
BigIntWriter<ArrayData> {
+ private BigIntWriterForArray(BigIntVector bigIntVector) {
+ super(bigIntVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ long readLong(ArrayData in, int ordinal) {
+ return in.getLong(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BooleanWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BooleanWriter.java
new file mode 100644
index 00000000..c7c53e59
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/BooleanWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for booleans ({@link BitVector}).
+ *
+ * <p>Use {@link #forRow(BitVector)} when writing from {@link RowData} and
{@link
+ * #forArray(BitVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class BooleanWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a BooleanWriter that reads from {@link RowData}. */
+ public static BooleanWriter<RowData> forRow(BitVector bitVector) {
+ return new BooleanWriterForRow(bitVector);
+ }
+
+ /** Creates a BooleanWriter that reads from {@link ArrayData}. */
+ public static BooleanWriter<ArrayData> forArray(BitVector bitVector) {
+ return new BooleanWriterForArray(bitVector);
+ }
+
+ private BooleanWriter(BitVector bitVector) {
+ super(bitVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract boolean readBoolean(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ BitVector vector = (BitVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readBoolean(in, ordinal) ? 1 : 0);
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class BooleanWriterForRow extends
BooleanWriter<RowData> {
+ private BooleanWriterForRow(BitVector bitVector) {
+ super(bitVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ boolean readBoolean(RowData in, int ordinal) {
+ return in.getBoolean(ordinal);
+ }
+ }
+
+ public static final class BooleanWriterForArray extends
BooleanWriter<ArrayData> {
+ private BooleanWriterForArray(BitVector bitVector) {
+ super(bitVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ boolean readBoolean(ArrayData in, int ordinal) {
+ return in.getBoolean(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DateWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DateWriter.java
new file mode 100644
index 00000000..566f97f7
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DateWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for dates stored as days since epoch ({@link
DateDayVector}).
+ *
+ * <p>Use {@link #forRow(DateDayVector)} when writing from {@link RowData} and
{@link
+ * #forArray(DateDayVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class DateWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a DateWriter that reads from {@link RowData}. */
+ public static DateWriter<RowData> forRow(DateDayVector dateDayVector) {
+ return new DateWriterForRow(dateDayVector);
+ }
+
+ /** Creates a DateWriter that reads from {@link ArrayData}. */
+ public static DateWriter<ArrayData> forArray(DateDayVector dateDayVector) {
+ return new DateWriterForArray(dateDayVector);
+ }
+
+ private DateWriter(DateDayVector dateDayVector) {
+ super(dateDayVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract int readDate(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ DateDayVector vector = (DateDayVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readDate(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class DateWriterForRow extends DateWriter<RowData> {
+ private DateWriterForRow(DateDayVector dateDayVector) {
+ super(dateDayVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ int readDate(RowData in, int ordinal) {
+ return in.getInt(ordinal);
+ }
+ }
+
+ public static final class DateWriterForArray extends DateWriter<ArrayData>
{
+ private DateWriterForArray(DateDayVector dateDayVector) {
+ super(dateDayVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ int readDate(ArrayData in, int ordinal) {
+ return in.getInt(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DecimalWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DecimalWriter.java
new file mode 100644
index 00000000..b3c95c74
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DecimalWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for decimals ({@link DecimalVector}).
+ *
+ * <p>Use {@link #forRow(DecimalVector, int, int)} when writing from {@link
RowData} and {@link
+ * #forArray(DecimalVector, int, int)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class DecimalWriter<T> extends ArrowFieldWriter<T> {
+
+ protected final int precision;
+ protected final int scale;
+
+ /** Creates a DecimalWriter that reads from {@link RowData}. */
+ public static DecimalWriter<RowData> forRow(DecimalVector decimalVector,
int precision, int scale) {
+ return new DecimalWriterForRow(decimalVector, precision, scale);
+ }
+
+ /** Creates a DecimalWriter that reads from {@link ArrayData}. */
+ public static DecimalWriter<ArrayData> forArray(DecimalVector
decimalVector, int precision, int scale) {
+ return new DecimalWriterForArray(decimalVector, precision, scale);
+ }
+
+ private DecimalWriter(DecimalVector decimalVector, int precision, int
scale) {
+ super(decimalVector);
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract DecimalData readDecimal(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ DecimalVector vector = (DecimalVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ BigDecimal bigDecimal = readDecimal(in, ordinal).toBigDecimal();
+ bigDecimal = fitBigDecimal(bigDecimal, precision, scale);
+ if (bigDecimal == null) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), bigDecimal);
+ }
+ }
+ }
+
+ private static BigDecimal fitBigDecimal(BigDecimal value, int precision,
int scale) {
+ value = value.setScale(scale, RoundingMode.HALF_UP);
+ if (value.precision() > precision) {
+ return null;
+ }
+ return value;
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class DecimalWriterForRow extends
DecimalWriter<RowData> {
+ private DecimalWriterForRow(DecimalVector decimalVector, int
precision, int scale) {
+ super(decimalVector, precision, scale);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ DecimalData readDecimal(RowData in, int ordinal) {
+ return in.getDecimal(ordinal, precision, scale);
+ }
+ }
+
+ public static final class DecimalWriterForArray extends
DecimalWriter<ArrayData> {
+ private DecimalWriterForArray(DecimalVector decimalVector, int
precision, int scale) {
+ super(decimalVector, precision, scale);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ DecimalData readDecimal(ArrayData in, int ordinal) {
+ return in.getDecimal(ordinal, precision, scale);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DoubleWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DoubleWriter.java
new file mode 100644
index 00000000..c23abdd9
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/DoubleWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 64-bit doubles ({@link Float8Vector}).
+ *
+ * <p>Use {@link #forRow(Float8Vector)} when writing from {@link RowData} and
{@link
+ * #forArray(Float8Vector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class DoubleWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a DoubleWriter that reads from {@link RowData}. */
+ public static DoubleWriter<RowData> forRow(Float8Vector float8Vector) {
+ return new DoubleWriterForRow(float8Vector);
+ }
+
+ /** Creates a DoubleWriter that reads from {@link ArrayData}. */
+ public static DoubleWriter<ArrayData> forArray(Float8Vector float8Vector) {
+ return new DoubleWriterForArray(float8Vector);
+ }
+
+ private DoubleWriter(Float8Vector float8Vector) {
+ super(float8Vector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract double readDouble(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ Float8Vector vector = (Float8Vector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readDouble(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class DoubleWriterForRow extends DoubleWriter<RowData>
{
+ private DoubleWriterForRow(Float8Vector float8Vector) {
+ super(float8Vector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ double readDouble(RowData in, int ordinal) {
+ return in.getDouble(ordinal);
+ }
+ }
+
+ public static final class DoubleWriterForArray extends
DoubleWriter<ArrayData> {
+ private DoubleWriterForArray(Float8Vector float8Vector) {
+ super(float8Vector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ double readDouble(ArrayData in, int ordinal) {
+ return in.getDouble(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/FloatWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/FloatWriter.java
new file mode 100644
index 00000000..e6fe2a72
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/FloatWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 32-bit floats ({@link Float4Vector}).
+ *
+ * <p>Use {@link #forRow(Float4Vector)} when writing from {@link RowData} and
{@link
+ * #forArray(Float4Vector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class FloatWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a FloatWriter that reads from {@link RowData}. */
+ public static FloatWriter<RowData> forRow(Float4Vector float4Vector) {
+ return new FloatWriterForRow(float4Vector);
+ }
+
+ /** Creates a FloatWriter that reads from {@link ArrayData}. */
+ public static FloatWriter<ArrayData> forArray(Float4Vector float4Vector) {
+ return new FloatWriterForArray(float4Vector);
+ }
+
+ private FloatWriter(Float4Vector float4Vector) {
+ super(float4Vector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract float readFloat(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ Float4Vector vector = (Float4Vector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readFloat(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class FloatWriterForRow extends FloatWriter<RowData> {
+ private FloatWriterForRow(Float4Vector float4Vector) {
+ super(float4Vector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ float readFloat(RowData in, int ordinal) {
+ return in.getFloat(ordinal);
+ }
+ }
+
+ public static final class FloatWriterForArray extends
FloatWriter<ArrayData> {
+ private FloatWriterForArray(Float4Vector float4Vector) {
+ super(float4Vector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ float readFloat(ArrayData in, int ordinal) {
+ return in.getFloat(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/IntWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/IntWriter.java
new file mode 100644
index 00000000..11e02122
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/IntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 32-bit integers ({@link IntVector}).
+ *
+ * <p>Use {@link #forRow(IntVector)} when writing from {@link RowData} and
{@link
+ * #forArray(IntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class IntWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates an IntWriter that reads from {@link RowData}. */
+ public static IntWriter<RowData> forRow(IntVector intVector) {
+ return new IntWriterForRow(intVector);
+ }
+
+ /** Creates an IntWriter that reads from {@link ArrayData}. */
+ public static IntWriter<ArrayData> forArray(IntVector intVector) {
+ return new IntWriterForArray(intVector);
+ }
+
+ private IntWriter(IntVector intVector) {
+ super(intVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract int readInt(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ IntVector vector = (IntVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readInt(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class IntWriterForRow extends IntWriter<RowData> {
+ private IntWriterForRow(IntVector intVector) {
+ super(intVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ int readInt(RowData in, int ordinal) {
+ return in.getInt(ordinal);
+ }
+ }
+
+ public static final class IntWriterForArray extends IntWriter<ArrayData> {
+ private IntWriterForArray(IntVector intVector) {
+ super(intVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ int readInt(ArrayData in, int ordinal) {
+ return in.getInt(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/NullWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/NullWriter.java
new file mode 100644
index 00000000..59dfe15b
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/NullWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.NullVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for null type ({@link NullVector}).
+ *
+ * <p>Every value written is null. Use {@link #forRow(NullVector)} when
writing from {@link RowData}
+ * and {@link #forArray(NullVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public class NullWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a NullWriter that reads from {@link RowData}. */
+ public static NullWriter<RowData> forRow(NullVector nullVector) {
+ return new NullWriter<>(nullVector);
+ }
+
+ /** Creates a NullWriter that reads from {@link ArrayData}. */
+ public static NullWriter<ArrayData> forArray(NullVector nullVector) {
+ return new NullWriter<>(nullVector);
+ }
+
+ private NullWriter(NullVector nullVector) {
+ super(nullVector);
+ }
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ // NullVector values are inherently null; no explicit write needed.
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/SmallIntWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/SmallIntWriter.java
new file mode 100644
index 00000000..ff0c11c1
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/SmallIntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 16-bit integers ({@link SmallIntVector}).
+ *
+ * <p>Use {@link #forRow(SmallIntVector)} when writing from {@link RowData}
and {@link
+ * #forArray(SmallIntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class SmallIntWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a SmallIntWriter that reads from {@link RowData}. */
+ public static SmallIntWriter<RowData> forRow(SmallIntVector
smallIntVector) {
+ return new SmallIntWriterForRow(smallIntVector);
+ }
+
+ /** Creates a SmallIntWriter that reads from {@link ArrayData}. */
+ public static SmallIntWriter<ArrayData> forArray(SmallIntVector
smallIntVector) {
+ return new SmallIntWriterForArray(smallIntVector);
+ }
+
+ private SmallIntWriter(SmallIntVector smallIntVector) {
+ super(smallIntVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract short readShort(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ SmallIntVector vector = (SmallIntVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readShort(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class SmallIntWriterForRow extends
SmallIntWriter<RowData> {
+ private SmallIntWriterForRow(SmallIntVector smallIntVector) {
+ super(smallIntVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ short readShort(RowData in, int ordinal) {
+ return in.getShort(ordinal);
+ }
+ }
+
+ public static final class SmallIntWriterForArray extends
SmallIntWriter<ArrayData> {
+ private SmallIntWriterForArray(SmallIntVector smallIntVector) {
+ super(smallIntVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ short readShort(ArrayData in, int ordinal) {
+ return in.getShort(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TinyIntWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TinyIntWriter.java
new file mode 100644
index 00000000..f1995072
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/TinyIntWriter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for 8-bit integers ({@link TinyIntVector}).
+ *
+ * <p>Use {@link #forRow(TinyIntVector)} when writing from {@link RowData} and
{@link
+ * #forArray(TinyIntVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class TinyIntWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a TinyIntWriter that reads from {@link RowData}. */
+ public static TinyIntWriter<RowData> forRow(TinyIntVector tinyIntVector) {
+ return new TinyIntWriterForRow(tinyIntVector);
+ }
+
+ /** Creates a TinyIntWriter that reads from {@link ArrayData}. */
+ public static TinyIntWriter<ArrayData> forArray(TinyIntVector
tinyIntVector) {
+ return new TinyIntWriterForArray(tinyIntVector);
+ }
+
+ private TinyIntWriter(TinyIntVector tinyIntVector) {
+ super(tinyIntVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract byte readByte(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ TinyIntVector vector = (TinyIntVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ vector.setSafe(getCount(), readByte(in, ordinal));
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class TinyIntWriterForRow extends
TinyIntWriter<RowData> {
+ private TinyIntWriterForRow(TinyIntVector tinyIntVector) {
+ super(tinyIntVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ byte readByte(RowData in, int ordinal) {
+ return in.getByte(ordinal);
+ }
+ }
+
+ public static final class TinyIntWriterForArray extends
TinyIntWriter<ArrayData> {
+ private TinyIntWriterForArray(TinyIntVector tinyIntVector) {
+ super(tinyIntVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ byte readByte(ArrayData in, int ordinal) {
+ return in.getByte(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarBinaryWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarBinaryWriter.java
new file mode 100644
index 00000000..06e9a1a9
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarBinaryWriter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * {@link ArrowFieldWriter} for variable-length binary data ({@link
VarBinaryVector}).
+ *
+ * <p>Use {@link #forRow(VarBinaryVector)} when writing from {@link RowData}
and {@link
+ * #forArray(VarBinaryVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class VarBinaryWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a VarBinaryWriter that reads from {@link RowData}. */
+ public static VarBinaryWriter<RowData> forRow(VarBinaryVector
varBinaryVector) {
+ return new VarBinaryWriterForRow(varBinaryVector);
+ }
+
+ /** Creates a VarBinaryWriter that reads from {@link ArrayData}. */
+ public static VarBinaryWriter<ArrayData> forArray(VarBinaryVector
varBinaryVector) {
+ return new VarBinaryWriterForArray(varBinaryVector);
+ }
+
+ private VarBinaryWriter(VarBinaryVector varBinaryVector) {
+ super(varBinaryVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract byte[] readBinary(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ VarBinaryVector vector = (VarBinaryVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ byte[] bytes = readBinary(in, ordinal);
+ vector.setSafe(getCount(), bytes);
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class VarBinaryWriterForRow extends
VarBinaryWriter<RowData> {
+ private VarBinaryWriterForRow(VarBinaryVector varBinaryVector) {
+ super(varBinaryVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ byte[] readBinary(RowData in, int ordinal) {
+ return in.getBinary(ordinal);
+ }
+ }
+
+ public static final class VarBinaryWriterForArray extends
VarBinaryWriter<ArrayData> {
+ private VarBinaryWriterForArray(VarBinaryVector varBinaryVector) {
+ super(varBinaryVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ byte[] readBinary(ArrayData in, int ordinal) {
+ return in.getBinary(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarCharWriter.java
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarCharWriter.java
new file mode 100644
index 00000000..6963e6a6
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/writers/VarCharWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+/**
+ * {@link ArrowFieldWriter} for variable-length strings ({@link
VarCharVector}).
+ *
+ * <p>Use {@link #forRow(VarCharVector)} when writing from {@link RowData} and
{@link
+ * #forArray(VarCharVector)} when writing from {@link ArrayData}.
+ *
+ * @param <T> the input data type
+ */
+public abstract class VarCharWriter<T> extends ArrowFieldWriter<T> {
+
+ /** Creates a VarCharWriter that reads from {@link RowData}. */
+ public static VarCharWriter<RowData> forRow(VarCharVector varCharVector) {
+ return new VarCharWriterForRow(varCharVector);
+ }
+
+ /** Creates a VarCharWriter that reads from {@link ArrayData}. */
+ public static VarCharWriter<ArrayData> forArray(VarCharVector
varCharVector) {
+ return new VarCharWriterForArray(varCharVector);
+ }
+
+ private VarCharWriter(VarCharVector varCharVector) {
+ super(varCharVector);
+ }
+
+ abstract boolean isNullAt(T in, int ordinal);
+
+ abstract StringData readString(T in, int ordinal);
+
+ @Override
+ public void doWrite(T in, int ordinal) {
+ VarCharVector vector = (VarCharVector) getValueVector();
+ if (isNullAt(in, ordinal)) {
+ vector.setNull(getCount());
+ } else {
+ byte[] bytes = readString(in, ordinal).toBytes();
+ vector.setSafe(getCount(), bytes);
+ }
+ }
+
+ // ---- Inner classes for RowData and ArrayData ----
+
+ public static final class VarCharWriterForRow extends
VarCharWriter<RowData> {
+ private VarCharWriterForRow(VarCharVector varCharVector) {
+ super(varCharVector);
+ }
+
+ @Override
+ boolean isNullAt(RowData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ StringData readString(RowData in, int ordinal) {
+ return in.getString(ordinal);
+ }
+ }
+
+ public static final class VarCharWriterForArray extends
VarCharWriter<ArrayData> {
+ private VarCharWriterForArray(VarCharVector varCharVector) {
+ super(varCharVector);
+ }
+
+ @Override
+ boolean isNullAt(ArrayData in, int ordinal) {
+ return in.isNullAt(ordinal);
+ }
+
+ @Override
+ StringData readString(ArrayData in, int ordinal) {
+ return in.getString(ordinal);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
new file mode 100644
index 00000000..042a4794
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowWriterTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link FlinkArrowWriter}. */
+public class FlinkArrowWriterTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ /** Writes one row containing all 12 basic types and verifies each vector.
*/
+ @Test
+ public void testWriteAllBasicTypes() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {
+ new NullType(),
+ new BooleanType(),
+ new TinyIntType(),
+ new SmallIntType(),
+ new IntType(),
+ new BigIntType(),
+ new FloatType(),
+ new DoubleType(),
+ new VarCharType(100),
+ new VarBinaryType(100),
+ new DecimalType(10, 2),
+ new DateType()
+ },
+ new String[] {
+ "f_null", "f_bool", "f_tinyint", "f_smallint", "f_int",
"f_bigint",
+ "f_float", "f_double", "f_varchar", "f_varbinary",
"f_decimal", "f_date"
+ });
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+ GenericRowData row = new GenericRowData(12);
+ row.setField(0, null);
+ row.setField(1, true);
+ row.setField(2, (byte) 42);
+ row.setField(3, (short) 1000);
+ row.setField(4, 123456);
+ row.setField(5, 9876543210L);
+ row.setField(6, 3.14f);
+ row.setField(7, 2.71828d);
+ row.setField(8, StringData.fromString("hello"));
+ row.setField(9, new byte[] {1, 2, 3});
+ row.setField(10, DecimalData.fromBigDecimal(new
BigDecimal("123.45"), 10, 2));
+ row.setField(11, 19000); // days since epoch
+
+ writer.write(row);
+ writer.finish();
+
+ assertEquals(1, root.getRowCount());
+
+ // Null
+ assertTrue(((NullVector) root.getVector("f_null")).isNull(0));
+ // Boolean
+ assertEquals(1, ((BitVector) root.getVector("f_bool")).get(0));
+ // TinyInt
+ assertEquals(42, ((TinyIntVector)
root.getVector("f_tinyint")).get(0));
+ // SmallInt
+ assertEquals(1000, ((SmallIntVector)
root.getVector("f_smallint")).get(0));
+ // Int
+ assertEquals(123456, ((IntVector) root.getVector("f_int")).get(0));
+ // BigInt
+ assertEquals(9876543210L, ((BigIntVector)
root.getVector("f_bigint")).get(0));
+ // Float
+ assertEquals(3.14f, ((Float4Vector)
root.getVector("f_float")).get(0), 0.001f);
+ // Double
+ assertEquals(2.71828d, ((Float8Vector)
root.getVector("f_double")).get(0), 0.00001d);
+ // VarChar
+ assertEquals(
+ "hello", new String(((VarCharVector)
root.getVector("f_varchar")).get(0), StandardCharsets.UTF_8));
+ // VarBinary
+ assertArrayEquals(new byte[] {1, 2, 3}, ((VarBinaryVector)
root.getVector("f_varbinary")).get(0));
+ // Decimal
+ assertEquals(new BigDecimal("123.45"), ((DecimalVector)
root.getVector("f_decimal")).getObject(0));
+ // Date
+ assertEquals(19000, ((DateDayVector)
root.getVector("f_date")).get(0));
+ }
+ }
+
+ /** Writes a row where all nullable fields are null. */
+ @Test
+ public void testWriteNullValues() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new BooleanType(), new IntType(), new
VarCharType(100), new DecimalType(10, 2)},
+ new String[] {"f_bool", "f_int", "f_varchar", "f_decimal"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+ GenericRowData row = new GenericRowData(4);
+ row.setField(0, null);
+ row.setField(1, null);
+ row.setField(2, null);
+ row.setField(3, null);
+
+ writer.write(row);
+ writer.finish();
+
+ assertEquals(1, root.getRowCount());
+ assertTrue(((BitVector) root.getVector("f_bool")).isNull(0));
+ assertTrue(((IntVector) root.getVector("f_int")).isNull(0));
+ assertTrue(((VarCharVector)
root.getVector("f_varchar")).isNull(0));
+ assertTrue(((DecimalVector)
root.getVector("f_decimal")).isNull(0));
+ }
+ }
+
+ /** Writes multiple rows, then resets and writes again. */
+ @Test
+ public void testWriteMultipleRowsAndReset() {
+ RowType rowType =
+ RowType.of(new LogicalType[] {new IntType(), new
VarCharType(100)}, new String[] {"id", "name"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+ // First batch: 3 rows
+ writer.write(GenericRowData.of(1, StringData.fromString("alice")));
+ writer.write(GenericRowData.of(2, StringData.fromString("bob")));
+ writer.write(GenericRowData.of(3, StringData.fromString("carol")));
+ writer.finish();
+
+ assertEquals(3, root.getRowCount());
+ IntVector idVector = (IntVector) root.getVector("id");
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertEquals(1, idVector.get(0));
+ assertEquals(2, idVector.get(1));
+ assertEquals(3, idVector.get(2));
+ assertEquals("alice", new String(nameVector.get(0),
StandardCharsets.UTF_8));
+ assertEquals("bob", new String(nameVector.get(1),
StandardCharsets.UTF_8));
+ assertEquals("carol", new String(nameVector.get(2),
StandardCharsets.UTF_8));
+
+ // Reset and write second batch: 2 rows
+ writer.reset();
+ writer.write(GenericRowData.of(10, StringData.fromString("dave")));
+ writer.write(GenericRowData.of(20, StringData.fromString("eve")));
+ writer.finish();
+
+ assertEquals(2, root.getRowCount());
+ assertEquals(10, idVector.get(0));
+ assertEquals(20, idVector.get(1));
+ assertEquals("dave", new String(nameVector.get(0),
StandardCharsets.UTF_8));
+ assertEquals("eve", new String(nameVector.get(1),
StandardCharsets.UTF_8));
+ }
+ }
+
+ /** Finish without writing any rows produces an empty batch. */
+ @Test
+ public void testEmptyBatch() {
+ RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new
String[] {"id"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+ writer.finish();
+
+ assertEquals(0, root.getRowCount());
+ }
+ }
+
+ /** Finish on a schema with zero columns produces an empty batch. */
+ @Test
+ public void testEmptyBatchZeroColumns() {
+ RowType rowType = RowType.of(new LogicalType[] {}, new String[] {});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+
+ writer.finish();
+
+ assertEquals(0, root.getRowCount());
+ }
+ }
+
+ /** Unsupported vector types (e.g., from ArrayType) throw
UnsupportedOperationException. */
+ @Test
+ public void testUnsupportedTypeThrows() {
+ RowType rowType = RowType.of(new LogicalType[] {new ArrayType(new
IntType())}, new String[] {"f_array"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ assertThrows(UnsupportedOperationException.class, () ->
FlinkArrowWriter.create(root, rowType));
+ }
+ }
+
+ /** The root returned by getRoot() is the same instance passed to
create(). */
+ @Test
+ public void testGetRoot() {
+ RowType rowType = RowType.of(new LogicalType[] {new IntType()}, new
String[] {"id"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema,
allocator)) {
+ FlinkArrowWriter writer = FlinkArrowWriter.create(root, rowType);
+ assertSame(root, writer.getRoot());
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/BasicWritersTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/BasicWritersTest.java
new file mode 100644
index 00000000..38343b4d
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/BasicWritersTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for the basic numeric type writers. */
+public class BasicWritersTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ // ---- NullWriter ----
+
+ @Nested
+ class NullWriterTests {
+
+ private NullVector nullVector;
+
+ @BeforeEach
+ public void setUp() {
+ nullVector = new NullVector("test_null");
+ }
+
+ @AfterEach
+ public void tearDown() {
+ nullVector.close();
+ }
+
+ @Test
+ public void testWriteFromRowData() {
+ NullWriter<RowData> writer = NullWriter.forRow(nullVector);
+
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.finish();
+
+ assertEquals(3, nullVector.getValueCount());
+ assertTrue(nullVector.isNull(0));
+ assertTrue(nullVector.isNull(1));
+ assertTrue(nullVector.isNull(2));
+ }
+
+ @Test
+ public void testWriteFromArrayData() {
+ NullWriter<ArrayData> writer = NullWriter.forArray(nullVector);
+
+ GenericArrayData array = new GenericArrayData(new Object[] {null,
null});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.finish();
+
+ assertEquals(2, nullVector.getValueCount());
+ assertTrue(nullVector.isNull(0));
+ assertTrue(nullVector.isNull(1));
+ }
+ }
+
+ // ---- BooleanWriter ----
+
+ @Nested
+ class BooleanWriterTests {
+
+ private BitVector bitVector;
+
+ @BeforeEach
+ public void setUp() {
+ bitVector = new BitVector("test_boolean", allocator);
+ bitVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ bitVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ BooleanWriter<RowData> writer = BooleanWriter.forRow(bitVector);
+
+ writer.write(GenericRowData.of(true), 0);
+ writer.write(GenericRowData.of(false), 0);
+ writer.write(GenericRowData.of(true), 0);
+ writer.finish();
+
+ assertEquals(3, bitVector.getValueCount());
+ assertEquals(1, bitVector.get(0));
+ assertEquals(0, bitVector.get(1));
+ assertEquals(1, bitVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ BooleanWriter<RowData> writer = BooleanWriter.forRow(bitVector);
+
+ writer.write(GenericRowData.of(true), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(false), 0);
+ writer.finish();
+
+ assertEquals(3, bitVector.getValueCount());
+ assertFalse(bitVector.isNull(0));
+ assertEquals(1, bitVector.get(0));
+ assertTrue(bitVector.isNull(1));
+ assertFalse(bitVector.isNull(2));
+ assertEquals(0, bitVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ BooleanWriter<ArrayData> writer =
BooleanWriter.forArray(bitVector);
+
+ GenericArrayData array = new GenericArrayData(new boolean[] {true,
false, true});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, bitVector.getValueCount());
+ assertEquals(1, bitVector.get(0));
+ assertEquals(0, bitVector.get(1));
+ assertEquals(1, bitVector.get(2));
+ }
+ }
+
+ // ---- TinyIntWriter ----
+
+ @Nested
+ class TinyIntWriterTests {
+
+ private TinyIntVector tinyIntVector;
+
+ @BeforeEach
+ public void setUp() {
+ tinyIntVector = new TinyIntVector("test_tinyint", allocator);
+ tinyIntVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ tinyIntVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ TinyIntWriter<RowData> writer =
TinyIntWriter.forRow(tinyIntVector);
+
+ writer.write(GenericRowData.of((byte) 1), 0);
+ writer.write(GenericRowData.of((byte) -128), 0);
+ writer.write(GenericRowData.of((byte) 127), 0);
+ writer.finish();
+
+ assertEquals(3, tinyIntVector.getValueCount());
+ assertEquals(1, tinyIntVector.get(0));
+ assertEquals(-128, tinyIntVector.get(1));
+ assertEquals(127, tinyIntVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ TinyIntWriter<RowData> writer =
TinyIntWriter.forRow(tinyIntVector);
+
+ writer.write(GenericRowData.of((byte) 42), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of((byte) 7), 0);
+ writer.finish();
+
+ assertEquals(3, tinyIntVector.getValueCount());
+ assertFalse(tinyIntVector.isNull(0));
+ assertEquals(42, tinyIntVector.get(0));
+ assertTrue(tinyIntVector.isNull(1));
+ assertFalse(tinyIntVector.isNull(2));
+ assertEquals(7, tinyIntVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ TinyIntWriter<ArrayData> writer =
TinyIntWriter.forArray(tinyIntVector);
+
+ GenericArrayData array = new GenericArrayData(new byte[] {10, 20,
30});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, tinyIntVector.getValueCount());
+ assertEquals(10, tinyIntVector.get(0));
+ assertEquals(20, tinyIntVector.get(1));
+ assertEquals(30, tinyIntVector.get(2));
+ }
+ }
+
+ // ---- SmallIntWriter ----
+
+ @Nested
+ class SmallIntWriterTests {
+
+ private SmallIntVector smallIntVector;
+
+ @BeforeEach
+ public void setUp() {
+ smallIntVector = new SmallIntVector("test_smallint", allocator);
+ smallIntVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ smallIntVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ SmallIntWriter<RowData> writer =
SmallIntWriter.forRow(smallIntVector);
+
+ writer.write(GenericRowData.of((short) 100), 0);
+ writer.write(GenericRowData.of((short) -32768), 0);
+ writer.write(GenericRowData.of((short) 32767), 0);
+ writer.finish();
+
+ assertEquals(3, smallIntVector.getValueCount());
+ assertEquals(100, smallIntVector.get(0));
+ assertEquals(-32768, smallIntVector.get(1));
+ assertEquals(32767, smallIntVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ SmallIntWriter<RowData> writer =
SmallIntWriter.forRow(smallIntVector);
+
+ writer.write(GenericRowData.of((short) 42), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of((short) 99), 0);
+ writer.finish();
+
+ assertEquals(3, smallIntVector.getValueCount());
+ assertFalse(smallIntVector.isNull(0));
+ assertEquals(42, smallIntVector.get(0));
+ assertTrue(smallIntVector.isNull(1));
+ assertFalse(smallIntVector.isNull(2));
+ assertEquals(99, smallIntVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ SmallIntWriter<ArrayData> writer =
SmallIntWriter.forArray(smallIntVector);
+
+ GenericArrayData array = new GenericArrayData(new short[] {10, 20,
30});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, smallIntVector.getValueCount());
+ assertEquals(10, smallIntVector.get(0));
+ assertEquals(20, smallIntVector.get(1));
+ assertEquals(30, smallIntVector.get(2));
+ }
+ }
+
+ // ---- BigIntWriter ----
+
+ @Nested
+ class BigIntWriterTests {
+
+ private BigIntVector bigIntVector;
+
+ @BeforeEach
+ public void setUp() {
+ bigIntVector = new BigIntVector("test_bigint", allocator);
+ bigIntVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ bigIntVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ BigIntWriter<RowData> writer = BigIntWriter.forRow(bigIntVector);
+
+ writer.write(GenericRowData.of(100L), 0);
+ writer.write(GenericRowData.of(Long.MIN_VALUE), 0);
+ writer.write(GenericRowData.of(Long.MAX_VALUE), 0);
+ writer.finish();
+
+ assertEquals(3, bigIntVector.getValueCount());
+ assertEquals(100L, bigIntVector.get(0));
+ assertEquals(Long.MIN_VALUE, bigIntVector.get(1));
+ assertEquals(Long.MAX_VALUE, bigIntVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ BigIntWriter<RowData> writer = BigIntWriter.forRow(bigIntVector);
+
+ writer.write(GenericRowData.of(42L), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(99L), 0);
+ writer.finish();
+
+ assertEquals(3, bigIntVector.getValueCount());
+ assertFalse(bigIntVector.isNull(0));
+ assertEquals(42L, bigIntVector.get(0));
+ assertTrue(bigIntVector.isNull(1));
+ assertFalse(bigIntVector.isNull(2));
+ assertEquals(99L, bigIntVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ BigIntWriter<ArrayData> writer =
BigIntWriter.forArray(bigIntVector);
+
+ GenericArrayData array = new GenericArrayData(new long[] {10L,
20L, 30L});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, bigIntVector.getValueCount());
+ assertEquals(10L, bigIntVector.get(0));
+ assertEquals(20L, bigIntVector.get(1));
+ assertEquals(30L, bigIntVector.get(2));
+ }
+ }
+
+ // ---- FloatWriter ----
+
+ @Nested
+ class FloatWriterTests {
+
+ private Float4Vector float4Vector;
+
+ @BeforeEach
+ public void setUp() {
+ float4Vector = new Float4Vector("test_float", allocator);
+ float4Vector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ float4Vector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ FloatWriter<RowData> writer = FloatWriter.forRow(float4Vector);
+
+ writer.write(GenericRowData.of(1.5f), 0);
+ writer.write(GenericRowData.of(-3.14f), 0);
+ writer.write(GenericRowData.of(0.0f), 0);
+ writer.finish();
+
+ assertEquals(3, float4Vector.getValueCount());
+ assertEquals(1.5f, float4Vector.get(0), 0.0001f);
+ assertEquals(-3.14f, float4Vector.get(1), 0.0001f);
+ assertEquals(0.0f, float4Vector.get(2), 0.0001f);
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ FloatWriter<RowData> writer = FloatWriter.forRow(float4Vector);
+
+ writer.write(GenericRowData.of(1.5f), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(2.5f), 0);
+ writer.finish();
+
+ assertEquals(3, float4Vector.getValueCount());
+ assertFalse(float4Vector.isNull(0));
+ assertEquals(1.5f, float4Vector.get(0), 0.0001f);
+ assertTrue(float4Vector.isNull(1));
+ assertFalse(float4Vector.isNull(2));
+ assertEquals(2.5f, float4Vector.get(2), 0.0001f);
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ FloatWriter<ArrayData> writer = FloatWriter.forArray(float4Vector);
+
+ GenericArrayData array = new GenericArrayData(new float[] {1.0f,
2.0f, 3.0f});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, float4Vector.getValueCount());
+ assertEquals(1.0f, float4Vector.get(0), 0.0001f);
+ assertEquals(2.0f, float4Vector.get(1), 0.0001f);
+ assertEquals(3.0f, float4Vector.get(2), 0.0001f);
+ }
+ }
+
+ // ---- DoubleWriter ----
+
+ @Nested
+ class DoubleWriterTests {
+
+ private Float8Vector float8Vector;
+
+ @BeforeEach
+ public void setUp() {
+ float8Vector = new Float8Vector("test_double", allocator);
+ float8Vector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ float8Vector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ DoubleWriter<RowData> writer = DoubleWriter.forRow(float8Vector);
+
+ writer.write(GenericRowData.of(1.5d), 0);
+ writer.write(GenericRowData.of(-3.14d), 0);
+ writer.write(GenericRowData.of(0.0d), 0);
+ writer.finish();
+
+ assertEquals(3, float8Vector.getValueCount());
+ assertEquals(1.5d, float8Vector.get(0), 0.0001d);
+ assertEquals(-3.14d, float8Vector.get(1), 0.0001d);
+ assertEquals(0.0d, float8Vector.get(2), 0.0001d);
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ DoubleWriter<RowData> writer = DoubleWriter.forRow(float8Vector);
+
+ writer.write(GenericRowData.of(1.5d), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(2.5d), 0);
+ writer.finish();
+
+ assertEquals(3, float8Vector.getValueCount());
+ assertFalse(float8Vector.isNull(0));
+ assertEquals(1.5d, float8Vector.get(0), 0.0001d);
+ assertTrue(float8Vector.isNull(1));
+ assertFalse(float8Vector.isNull(2));
+ assertEquals(2.5d, float8Vector.get(2), 0.0001d);
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ DoubleWriter<ArrayData> writer =
DoubleWriter.forArray(float8Vector);
+
+ GenericArrayData array = new GenericArrayData(new double[] {1.0d,
2.0d, 3.0d});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, float8Vector.getValueCount());
+ assertEquals(1.0d, float8Vector.get(0), 0.0001d);
+ assertEquals(2.0d, float8Vector.get(1), 0.0001d);
+ assertEquals(3.0d, float8Vector.get(2), 0.0001d);
+ }
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/IntWriterTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/IntWriterTest.java
new file mode 100644
index 00000000..9280f0b2
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/IntWriterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for {@link IntWriter}. */
+public class IntWriterTest {
+
+ private BufferAllocator allocator;
+ private IntVector intVector;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ intVector = new IntVector("test_int", allocator);
+ intVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ intVector.close();
+ allocator.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ IntWriter<RowData> writer = IntWriter.forRow(intVector);
+
+ GenericRowData row0 = GenericRowData.of(42);
+ GenericRowData row1 = GenericRowData.of(100);
+ GenericRowData row2 = GenericRowData.of(-7);
+
+ writer.write(row0, 0);
+ writer.write(row1, 0);
+ writer.write(row2, 0);
+ writer.finish();
+
+ assertEquals(3, intVector.getValueCount());
+ assertEquals(42, intVector.get(0));
+ assertEquals(100, intVector.get(1));
+ assertEquals(-7, intVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ IntWriter<RowData> writer = IntWriter.forRow(intVector);
+
+ GenericRowData row0 = GenericRowData.of(42);
+ GenericRowData row1 = GenericRowData.of((Object) null);
+ GenericRowData row2 = GenericRowData.of(99);
+
+ writer.write(row0, 0);
+ writer.write(row1, 0);
+ writer.write(row2, 0);
+ writer.finish();
+
+ assertEquals(3, intVector.getValueCount());
+ assertFalse(intVector.isNull(0));
+ assertEquals(42, intVector.get(0));
+ assertTrue(intVector.isNull(1));
+ assertFalse(intVector.isNull(2));
+ assertEquals(99, intVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ IntWriter<ArrayData> writer = IntWriter.forArray(intVector);
+
+ GenericArrayData array = new GenericArrayData(new int[] {10, 20, 30});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, intVector.getValueCount());
+ assertEquals(10, intVector.get(0));
+ assertEquals(20, intVector.get(1));
+ assertEquals(30, intVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromArrayData() {
+ IntWriter<ArrayData> writer = IntWriter.forArray(intVector);
+
+ GenericArrayData array = new GenericArrayData(new Integer[] {10, null,
30});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, intVector.getValueCount());
+ assertEquals(10, intVector.get(0));
+ assertTrue(intVector.isNull(1));
+ assertEquals(30, intVector.get(2));
+ }
+
+ @Test
+ public void testResetAndWriteNewBatch() {
+ IntWriter<RowData> writer = IntWriter.forRow(intVector);
+
+ // First batch
+ writer.write(GenericRowData.of(1), 0);
+ writer.write(GenericRowData.of(2), 0);
+ writer.finish();
+
+ assertEquals(2, intVector.getValueCount());
+ assertEquals(1, intVector.get(0));
+ assertEquals(2, intVector.get(1));
+
+ // Reset
+ writer.reset();
+
+ assertEquals(0, intVector.getValueCount());
+
+ // Second batch
+ writer.write(GenericRowData.of(100), 0);
+ writer.finish();
+
+ assertEquals(1, intVector.getValueCount());
+ assertEquals(100, intVector.get(0));
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/NonNumericWritersTest.java
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/NonNumericWritersTest.java
new file mode 100644
index 00000000..0f9cb425
--- /dev/null
+++
b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/writers/NonNumericWritersTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow.writers;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.math.BigDecimal;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for the non-numeric type writers. */
+public class NonNumericWritersTest {
+
+ private BufferAllocator allocator;
+
+ @BeforeEach
+ public void setUp() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ allocator.close();
+ }
+
+ // ---- VarCharWriter ----
+
+ @Nested
+ class VarCharWriterTests {
+
+ private VarCharVector varCharVector;
+
+ @BeforeEach
+ public void setUp() {
+ varCharVector = new VarCharVector("test_varchar", allocator);
+ varCharVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ varCharVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ VarCharWriter<RowData> writer =
VarCharWriter.forRow(varCharVector);
+
+ writer.write(GenericRowData.of(StringData.fromString("hello")), 0);
+ writer.write(GenericRowData.of(StringData.fromString("world")), 0);
+ writer.write(GenericRowData.of(StringData.fromString("")), 0);
+ writer.finish();
+
+ assertEquals(3, varCharVector.getValueCount());
+ assertArrayEquals("hello".getBytes(), varCharVector.get(0));
+ assertArrayEquals("world".getBytes(), varCharVector.get(1));
+ assertArrayEquals("".getBytes(), varCharVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ VarCharWriter<RowData> writer =
VarCharWriter.forRow(varCharVector);
+
+ writer.write(GenericRowData.of(StringData.fromString("hello")), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(StringData.fromString("world")), 0);
+ writer.finish();
+
+ assertEquals(3, varCharVector.getValueCount());
+ assertFalse(varCharVector.isNull(0));
+ assertArrayEquals("hello".getBytes(), varCharVector.get(0));
+ assertTrue(varCharVector.isNull(1));
+ assertFalse(varCharVector.isNull(2));
+ assertArrayEquals("world".getBytes(), varCharVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ VarCharWriter<ArrayData> writer =
VarCharWriter.forArray(varCharVector);
+
+ GenericArrayData array = new GenericArrayData(new StringData[] {
+ StringData.fromString("foo"), StringData.fromString("bar"),
StringData.fromString("baz")
+ });
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, varCharVector.getValueCount());
+ assertArrayEquals("foo".getBytes(), varCharVector.get(0));
+ assertArrayEquals("bar".getBytes(), varCharVector.get(1));
+ assertArrayEquals("baz".getBytes(), varCharVector.get(2));
+ }
+ }
+
+ // ---- VarBinaryWriter ----
+
+ @Nested
+ class VarBinaryWriterTests {
+
+ private VarBinaryVector varBinaryVector;
+
+ @BeforeEach
+ public void setUp() {
+ varBinaryVector = new VarBinaryVector("test_varbinary", allocator);
+ varBinaryVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ varBinaryVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ VarBinaryWriter<RowData> writer =
VarBinaryWriter.forRow(varBinaryVector);
+
+ writer.write(GenericRowData.of(new byte[] {1, 2, 3}), 0);
+ writer.write(GenericRowData.of(new byte[] {4, 5}), 0);
+ writer.write(GenericRowData.of(new byte[] {}), 0);
+ writer.finish();
+
+ assertEquals(3, varBinaryVector.getValueCount());
+ assertArrayEquals(new byte[] {1, 2, 3}, varBinaryVector.get(0));
+ assertArrayEquals(new byte[] {4, 5}, varBinaryVector.get(1));
+ assertArrayEquals(new byte[] {}, varBinaryVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ VarBinaryWriter<RowData> writer =
VarBinaryWriter.forRow(varBinaryVector);
+
+ writer.write(GenericRowData.of(new byte[] {1, 2, 3}), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(new byte[] {7, 8}), 0);
+ writer.finish();
+
+ assertEquals(3, varBinaryVector.getValueCount());
+ assertFalse(varBinaryVector.isNull(0));
+ assertArrayEquals(new byte[] {1, 2, 3}, varBinaryVector.get(0));
+ assertTrue(varBinaryVector.isNull(1));
+ assertFalse(varBinaryVector.isNull(2));
+ assertArrayEquals(new byte[] {7, 8}, varBinaryVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ VarBinaryWriter<ArrayData> writer =
VarBinaryWriter.forArray(varBinaryVector);
+
+ GenericArrayData array =
+ new GenericArrayData(new byte[][] {new byte[] {10, 20},
new byte[] {30, 40}, new byte[] {50}});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, varBinaryVector.getValueCount());
+ assertArrayEquals(new byte[] {10, 20}, varBinaryVector.get(0));
+ assertArrayEquals(new byte[] {30, 40}, varBinaryVector.get(1));
+ assertArrayEquals(new byte[] {50}, varBinaryVector.get(2));
+ }
+ }
+
+ // ---- DecimalWriter ----
+
+ @Nested
+ class DecimalWriterTests {
+
+ private DecimalVector decimalVector;
+
+ @BeforeEach
+ public void setUp() {
+ decimalVector = new DecimalVector("test_decimal", allocator, 10,
2);
+ decimalVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ decimalVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ DecimalWriter<RowData> writer =
DecimalWriter.forRow(decimalVector, 10, 2);
+
+ writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new
BigDecimal("123.45"), 10, 2)), 0);
+ writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new
BigDecimal("-678.90"), 10, 2)), 0);
+ writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new
BigDecimal("0.00"), 10, 2)), 0);
+ writer.finish();
+
+ assertEquals(3, decimalVector.getValueCount());
+ assertEquals(new BigDecimal("123.45"), decimalVector.getObject(0));
+ assertEquals(new BigDecimal("-678.90"),
decimalVector.getObject(1));
+ assertEquals(new BigDecimal("0.00"), decimalVector.getObject(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ DecimalWriter<RowData> writer =
DecimalWriter.forRow(decimalVector, 10, 2);
+
+ writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new
BigDecimal("123.45"), 10, 2)), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(DecimalData.fromBigDecimal(new
BigDecimal("99.99"), 10, 2)), 0);
+ writer.finish();
+
+ assertEquals(3, decimalVector.getValueCount());
+ assertFalse(decimalVector.isNull(0));
+ assertEquals(new BigDecimal("123.45"), decimalVector.getObject(0));
+ assertTrue(decimalVector.isNull(1));
+ assertFalse(decimalVector.isNull(2));
+ assertEquals(new BigDecimal("99.99"), decimalVector.getObject(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ DecimalWriter<ArrayData> writer =
DecimalWriter.forArray(decimalVector, 10, 2);
+
+ GenericArrayData array = new GenericArrayData(new DecimalData[] {
+ DecimalData.fromBigDecimal(new BigDecimal("1.11"), 10, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("2.22"), 10, 2),
+ DecimalData.fromBigDecimal(new BigDecimal("3.33"), 10, 2)
+ });
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, decimalVector.getValueCount());
+ assertEquals(new BigDecimal("1.11"), decimalVector.getObject(0));
+ assertEquals(new BigDecimal("2.22"), decimalVector.getObject(1));
+ assertEquals(new BigDecimal("3.33"), decimalVector.getObject(2));
+ }
+ }
+
+ // ---- DateWriter ----
+
+ @Nested
+ class DateWriterTests {
+
+ private DateDayVector dateDayVector;
+
+ @BeforeEach
+ public void setUp() {
+ dateDayVector = new DateDayVector("test_date", allocator);
+ dateDayVector.allocateNew();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ dateDayVector.close();
+ }
+
+ @Test
+ public void testWriteValuesFromRowData() {
+ DateWriter<RowData> writer = DateWriter.forRow(dateDayVector);
+
+ // 19000 days since epoch = ~2022-01-06
+ writer.write(GenericRowData.of(19000), 0);
+ writer.write(GenericRowData.of(0), 0);
+ writer.write(GenericRowData.of(18628), 0);
+ writer.finish();
+
+ assertEquals(3, dateDayVector.getValueCount());
+ assertEquals(19000, dateDayVector.get(0));
+ assertEquals(0, dateDayVector.get(1));
+ assertEquals(18628, dateDayVector.get(2));
+ }
+
+ @Test
+ public void testWriteNullFromRowData() {
+ DateWriter<RowData> writer = DateWriter.forRow(dateDayVector);
+
+ writer.write(GenericRowData.of(19000), 0);
+ writer.write(GenericRowData.of((Object) null), 0);
+ writer.write(GenericRowData.of(18628), 0);
+ writer.finish();
+
+ assertEquals(3, dateDayVector.getValueCount());
+ assertFalse(dateDayVector.isNull(0));
+ assertEquals(19000, dateDayVector.get(0));
+ assertTrue(dateDayVector.isNull(1));
+ assertFalse(dateDayVector.isNull(2));
+ assertEquals(18628, dateDayVector.get(2));
+ }
+
+ @Test
+ public void testWriteValuesFromArrayData() {
+ DateWriter<ArrayData> writer = DateWriter.forArray(dateDayVector);
+
+ GenericArrayData array = new GenericArrayData(new int[] {19000, 0,
18628});
+
+ writer.write(array, 0);
+ writer.write(array, 1);
+ writer.write(array, 2);
+ writer.finish();
+
+ assertEquals(3, dateDayVector.getValueCount());
+ assertEquals(19000, dateDayVector.get(0));
+ assertEquals(0, dateDayVector.get(1));
+ assertEquals(18628, dateDayVector.get(2));
+ }
+ }
+}