This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-array-flink
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 6de9f65b485e847a6232f9f2f5dd19a9d278d331
Author: forwardxu <[email protected]>
AuthorDate: Thu Nov 27 14:51:46 2025 +0800

    [flink] Support Array type in Flink connector
---
 .../apache/fluss/record/DefaultLogRecordBatch.java |  25 ++-
 .../org/apache/fluss/record/LogRecordBatch.java    |   8 +
 .../apache/fluss/record/LogRecordReadContext.java  |  26 +++
 .../org/apache/fluss/row/arrow/ArrowReader.java    |   3 +-
 .../org/apache/fluss/row/arrow/ArrowWriter.java    |  22 +-
 .../fluss/row/arrow/writers/ArrowArrayWriter.java  |  29 +++
 .../fluss/row/arrow/writers/ArrowFieldWriter.java  |   5 +
 .../java/org/apache/fluss/utils/ArrowUtils.java    |  92 +++++++-
 .../fluss/row/arrow/ArrowReaderWriterTest.java     |   9 +-
 .../flink/sink/Flink118ComplexTypeITCase.java      |  21 ++
 .../flink/sink/Flink119ComplexTypeITCase.java      |  21 ++
 .../flink/sink/Flink120ComplexTypeITCase.java      |  21 ++
 .../fluss/flink/sink/Flink21ComplexTypeITCase.java |  21 ++
 .../apache/fluss/flink/utils/FlinkConversions.java |   8 +-
 .../fluss/flink/utils/FlinkSchemaValidator.java    | 122 +++++++++++
 .../fluss/flink/sink/FlinkComplexTypeITCase.java   | 231 +++++++++++++++++++++
 .../flink/utils/FlinkSchemaValidatorTest.java      | 229 ++++++++++++++++++++
 17 files changed, 878 insertions(+), 15 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index eaacc48a6..2c6bce911 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -223,7 +223,8 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
                         rowType,
                         context.getVectorSchemaRoot(schemaId),
                         context.getBufferAllocator(),
-                        timestamp);
+                        timestamp,
+                        context);
             case INDEXED:
                 return rowRecordIterator(rowType, timestamp);
             default:
@@ -279,7 +280,11 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
     }
 
     private CloseableIterator<LogRecord> columnRecordIterator(
-            RowType rowType, VectorSchemaRoot root, BufferAllocator allocator, 
long timestamp) {
+            RowType rowType,
+            VectorSchemaRoot root,
+            BufferAllocator allocator,
+            long timestamp,
+            ReadContext readContext) {
         boolean isAppendOnly = (attributes() & APPEND_ONLY_FLAG_MASK) > 0;
         if (isAppendOnly) {
             // append only batch, no change type vector,
@@ -289,7 +294,13 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             int arrowLength = sizeInBytes() - recordBatchHeaderSize;
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
-                            segment, arrowOffset, arrowLength, root, 
allocator, rowType);
+                            segment,
+                            arrowOffset,
+                            arrowLength,
+                            root,
+                            allocator,
+                            rowType,
+                            readContext);
             return new ArrowLogRecordIterator(reader, timestamp) {
                 @Override
                 protected ChangeType getChangeType(int rowId) {
@@ -307,7 +318,13 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
                     sizeInBytes() - arrowChangeTypeOffset(magic) - 
changeTypeVector.sizeInBytes();
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
-                            segment, arrowOffset, arrowLength, root, 
allocator, rowType);
+                            segment,
+                            arrowOffset,
+                            arrowLength,
+                            root,
+                            allocator,
+                            rowType,
+                            readContext);
             return new ArrowLogRecordIterator(reader, timestamp) {
                 @Override
                 protected ChangeType getChangeType(int rowId) {
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
index cadc1ef9d..c226f25a4 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
@@ -195,5 +195,13 @@ public interface LogRecordBatch {
 
         /** Gets the buffer allocator. */
         BufferAllocator getBufferAllocator();
+
+        /**
+         * Registers a batch VectorSchemaRoot to be closed when this context 
is closed. This is used
+         * to track VectorSchemaRoots created for individual batches.
+         *
+         * @param batchRoot the batch VectorSchemaRoot to register
+         */
+        void registerBatchRoot(VectorSchemaRoot batchRoot);
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 1158dbc35..08ef69037 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -32,6 +32,7 @@ import org.apache.fluss.utils.Projection;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.IntStream;
 
@@ -55,6 +56,8 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
     private final FieldGetter[] selectedFieldGetters;
     // whether the projection is push downed to the server side and the 
returned data is pruned.
     private final boolean projectionPushDowned;
+    // track all batch VectorSchemaRoots created for this context
+    private final List<VectorSchemaRoot> batchRoots;
 
     /**
      * Creates a LogRecordReadContext for the given table information and 
projection information.
@@ -164,6 +167,7 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         this.bufferAllocator = bufferAllocator;
         this.selectedFieldGetters = selectedFieldGetters;
         this.projectionPushDowned = projectionPushDowned;
+        this.batchRoots = new ArrayList<>();
     }
 
     @Override
@@ -215,7 +219,29 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         return bufferAllocator;
     }
 
+    /**
+     * Registers a batch VectorSchemaRoot to be closed when this context is 
closed. This is used to
+     * track VectorSchemaRoots created for individual batches.
+     */
+    @Override
+    public void registerBatchRoot(VectorSchemaRoot batchRoot) {
+        if (batchRoot != null) {
+            batchRoots.add(batchRoot);
+        }
+    }
+
     public void close() {
+        // Close all batch roots first
+        for (VectorSchemaRoot root : batchRoots) {
+            try {
+                root.close();
+            } catch (Exception e) {
+                // Continue closing other roots even if one fails
+            }
+        }
+        batchRoots.clear();
+
+        // Then close the shared schema root and allocator
         if (vectorSchemaRoot != null) {
             vectorSchemaRoot.close();
         }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java
index ea5afe930..d50eb8fe0 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java
@@ -58,6 +58,7 @@ public class ArrowReader {
     }
 
     public void close() {
-        root.close();
+        // Do not close the VectorSchemaRoot here.
+        // The root will be closed when LogRecordReadContext closes.
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java
index 9b0ab0081..8c1ba501b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java
@@ -30,6 +30,7 @@ import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVe
 import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorUnloader;
+import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
 import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionCodec;
 import 
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionUtil;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ipc.WriteChannel;
@@ -181,7 +182,10 @@ public class ArrowWriter implements AutoCloseable {
         for (int i = 0; i < fieldWriters.length; i++) {
             FieldVector fieldVector = root.getVector(i);
             initFieldVector(fieldVector);
-            fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, 
schema.getTypeAt(i));
+        }
+        // Reset field writers to clear their offset counters (for ArrayWriter)
+        for (ArrowFieldWriter fieldWriter : fieldWriters) {
+            resetFieldWriter(fieldWriter);
         }
         root.setRowCount(0);
         recordsCount = 0;
@@ -285,6 +289,10 @@ public class ArrowWriter implements AutoCloseable {
         if (this.epoch == epoch) {
             root.clear();
             recordsCount = 0;
+            // Reset array writers when recycling
+            for (ArrowFieldWriter fieldWriter : fieldWriters) {
+                resetFieldWriter(fieldWriter);
+            }
             provider.recycleWriter(this);
         }
     }
@@ -302,11 +310,23 @@ public class ArrowWriter implements AutoCloseable {
             ((BaseFixedWidthVector) fieldVector).allocateNew(INITIAL_CAPACITY);
         } else if (fieldVector instanceof BaseVariableWidthVector) {
             ((BaseVariableWidthVector) 
fieldVector).allocateNew(INITIAL_CAPACITY);
+        } else if (fieldVector instanceof ListVector) {
+            ListVector listVector = (ListVector) fieldVector;
+            listVector.allocateNew();
+            FieldVector dataVector = listVector.getDataVector();
+            if (dataVector != null) {
+                initFieldVector(dataVector);
+            }
         } else {
             fieldVector.allocateNew();
         }
     }
 
+    /** Resets field writers to clear their state for reuse. */
+    private void resetFieldWriter(ArrowFieldWriter fieldWriter) {
+        fieldWriter.reset();
+    }
+
     private int estimatedBytesWritten(int currentBytes) {
         if (compressionCodec.getCodecType() == 
CompressionUtil.CodecType.NO_COMPRESSION) {
             return currentBytes;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
index 61427cb06..c5001af56 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
@@ -45,5 +45,34 @@ public class ArrowArrayWriter extends ArrowFieldWriter {
         }
         offset += array.size();
         listVector.endValue(rowIndex, array.size());
+
+        // Fix dataVector valueCount after writing each row
+        // This ensures VectorUnloader has correct valueCount for all vectors
+        FieldVector dataVector = listVector.getDataVector();
+        if (dataVector != null && offset > 0) {
+            dataVector.setValueCount(offset);
+            // Recursively fix nested ListVectors
+            fixNestedListVectorValueCount(dataVector, offset);
+        }
+    }
+
+    private void fixNestedListVectorValueCount(FieldVector vector, int 
parentCount) {
+        if (vector instanceof ListVector && parentCount > 0) {
+            ListVector listVector = (ListVector) vector;
+            int dataVectorValueCount = 
listVector.getElementEndIndex(parentCount - 1);
+            FieldVector dataVector = listVector.getDataVector();
+            if (dataVector != null && dataVectorValueCount > 0) {
+                dataVector.setValueCount(dataVectorValueCount);
+                fixNestedListVectorValueCount(dataVector, 
dataVectorValueCount);
+            }
+        }
+    }
+
+    /** Resets the offset counter for reuse. */
+    @Override
+    public void reset() {
+        super.reset();
+        elementWriter.reset();
+        offset = 0;
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java
index bb020196f..8dd16c64b 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowFieldWriter.java
@@ -59,4 +59,9 @@ public abstract class ArrowFieldWriter {
             doWrite(rowIndex, getters, ordinal, handleSafe);
         }
     }
+
+    /** Resets the state of the writer to write the next batch of fields. */
+    public void reset() {
+        fieldVector.reset();
+    }
 }
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
index 72da2556c..8e9ab3e19 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
@@ -159,28 +159,110 @@ public class ArrowUtils {
             MemorySegment segment,
             int arrowOffset,
             int arrowLength,
-            VectorSchemaRoot schemaRoot,
+            VectorSchemaRoot sharedSchemaRoot,
             BufferAllocator allocator,
-            RowType rowType) {
+            RowType rowType,
+            org.apache.fluss.record.LogRecordBatch.ReadContext readContext) {
         ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength);
+
         try (ReadChannel channel =
                         new ReadChannel(new 
ByteBufferReadableChannel(arrowBatchBuffer));
                 ArrowRecordBatch batch = deserializeRecordBatch(channel, 
allocator)) {
+
+            // Create a new VectorSchemaRoot for each batch to avoid data 
contamination
+            // when the shared root is reused for subsequent batches
+            VectorSchemaRoot batchSchemaRoot =
+                    VectorSchemaRoot.create(sharedSchemaRoot.getSchema(), 
allocator);
+
             VectorLoader vectorLoader =
-                    new VectorLoader(schemaRoot, 
ArrowCompressionFactory.INSTANCE);
+                    new VectorLoader(batchSchemaRoot, 
ArrowCompressionFactory.INSTANCE);
             vectorLoader.load(batch);
+
+            // Fix buffer writerIndex after VectorLoader.load
+            // VectorLoader.load() sets the capacity but not the writerIndex 
for buffers.
+            // This is especially critical for Array types (ListVector), where 
we need to:
+            // 1. Fix writerIndex for all buffers to match their capacity
+            // 2. Fix nested dataVector's valueCount based on the last offset
+            // This was not needed before Array type support, but is now 
essential
+            // for correct Arrow data reading with complex nested types.
+            fixVectorBuffers(batchSchemaRoot);
+
             List<ColumnVector> columnVectors = new ArrayList<>();
-            List<FieldVector> fieldVectors = schemaRoot.getFieldVectors();
+            List<FieldVector> fieldVectors = batchSchemaRoot.getFieldVectors();
             for (int i = 0; i < fieldVectors.size(); i++) {
                 columnVectors.add(
                         createArrowColumnVector(fieldVectors.get(i), 
rowType.getTypeAt(i)));
             }
-            return new ArrowReader(schemaRoot, columnVectors.toArray(new 
ColumnVector[0]));
+
+            // Register the batch root with the read context so it can be 
closed later
+            if (readContext != null) {
+                readContext.registerBatchRoot(batchSchemaRoot);
+            }
+
+            return new ArrowReader(batchSchemaRoot, columnVectors.toArray(new 
ColumnVector[0]));
         } catch (IOException e) {
             throw new RuntimeException("Failed to deserialize 
ArrowRecordBatch.", e);
         }
     }
 
+    /**
+     * Fixes writerIndex for all buffers in all vectors after 
VectorLoader.load().
+     *
+     * <p>VectorLoader.load() sets the capacity but not the writerIndex for 
buffers, which can cause
+     * issues when reading data. This method recursively fixes all vectors:
+     *
+     * <ul>
+     *   <li>For all vectors: sets writerIndex to match capacity for proper 
buffer reading
+     *   <li>For ListVector (Array type): additionally fixes the nested 
dataVector's valueCount
+     *       based on the last offset, and recursively fixes nested structures
+     * </ul>
+     *
+     * <p>This is especially critical for Array types introduced in the 
system, where nested
+     * dataVectors need correct valueCount to avoid data corruption or reading 
errors.
+     */
+    private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) {
+        for (FieldVector fieldVector : schemaRoot.getFieldVectors()) {
+            fixVectorBuffersRecursive(fieldVector);
+        }
+    }
+
+    private static void fixVectorBuffersRecursive(FieldVector vector) {
+        // Fix all buffers in this vector
+        for (ArrowBuf buf : vector.getFieldBuffers()) {
+            if (buf.capacity() > 0 && buf.writerIndex() < buf.capacity()) {
+                buf.writerIndex((int) buf.capacity());
+            }
+        }
+
+        // Special handling for ListVector: fix dataVector valueCount and 
buffers
+        // This is critical for Array type support. Without this, the nested 
dataVector
+        // may have incorrect valueCount, leading to data corruption or 
reading errors.
+        if (vector instanceof ListVector) {
+            ListVector listVector = (ListVector) vector;
+            int vectorValueCount = listVector.getValueCount();
+
+            if (vectorValueCount > 0) {
+                FieldVector dataVector = listVector.getDataVector();
+                if (dataVector != null) {
+                    // Calculate the correct valueCount for dataVector from 
the last offset
+                    // For array [a,b,c] followed by [d,e], offsets are 
[0,3,5], so dataVector
+                    // needs valueCount=5 (not the parent's valueCount=2)
+                    int dataVectorValueCount = 
listVector.getElementEndIndex(vectorValueCount - 1);
+                    if (dataVectorValueCount > 0) {
+                        dataVector.setValueCount(dataVectorValueCount);
+                    }
+                    // Recursively fix the dataVector (needed for nested 
arrays)
+                    fixVectorBuffersRecursive(dataVector);
+                }
+            }
+        } else if (vector.getChildrenFromFields() != null) {
+            // Recursively fix other child vectors
+            for (FieldVector child : vector.getChildrenFromFields()) {
+                fixVectorBuffersRecursive(child);
+            }
+        }
+    }
+
     /**
      * Serialize metadata of a {@link ArrowRecordBatch} into write channel. 
This avoids to create an
      * instance of {@link ArrowRecordBatch}.
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
index 039032a1e..295d6272a 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
@@ -21,6 +21,7 @@ import org.apache.fluss.memory.AbstractPagedOutputView;
 import org.apache.fluss.memory.ManagedPagedOutputView;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.TestingMemorySegmentPool;
+import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.GenericRow;
@@ -177,7 +178,9 @@ class ArrowReaderWriterTest {
                 ArrowWriterPool provider = new ArrowWriterPool(allocator);
                 ArrowWriter writer =
                         provider.getOrCreateWriter(
-                                1L, 1, Integer.MAX_VALUE, rowType, 
NO_COMPRESSION)) {
+                                1L, 1, Integer.MAX_VALUE, rowType, 
NO_COMPRESSION);
+                LogRecordReadContext readContext =
+                        LogRecordReadContext.createArrowReadContext(rowType, 
0)) {
             for (InternalRow row : TEST_DATA) {
                 writer.writeRow(row);
             }
@@ -197,7 +200,8 @@ class ArrowReaderWriterTest {
             
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, 
size);
 
             ArrowReader reader =
-                    ArrowUtils.createArrowReader(segment, 0, size, root, 
allocator, rowType);
+                    ArrowUtils.createArrowReader(
+                            segment, 0, size, root, allocator, rowType, 
readContext);
             int rowCount = reader.getRowCount();
             for (int i = 0; i < rowCount; i++) {
                 ColumnarRow row = reader.read(i);
@@ -238,7 +242,6 @@ class ArrowReaderWriterTest {
                 assertThat(row.getTimestampLtz(20, 
6)).isEqualTo(rowData.getTimestampLtz(20, 6));
                 assertThat(row.getTimestampLtz(21, 
9)).isEqualTo(rowData.getTimestampLtz(21, 9));
             }
-            reader.close();
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118ComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118ComplexTypeITCase.java
new file mode 100644
index 000000000..946b4abb6
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/sink/Flink118ComplexTypeITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+/** Integration tests for Array type support in Flink 1.18. */
+public class Flink118ComplexTypeITCase extends FlinkComplexTypeITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119ComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119ComplexTypeITCase.java
new file mode 100644
index 000000000..f25e943d9
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/sink/Flink119ComplexTypeITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+/** Integration tests for Array type support in Flink 1.19. */
+public class Flink119ComplexTypeITCase extends FlinkComplexTypeITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/sink/Flink120ComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/sink/Flink120ComplexTypeITCase.java
new file mode 100644
index 000000000..86781efaf
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/sink/Flink120ComplexTypeITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+/** Integration tests for Array type support in Flink 1.20. */
+public class Flink120ComplexTypeITCase extends FlinkComplexTypeITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
new file mode 100644
index 000000000..7b2ed44b2
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/sink/Flink21ComplexTypeITCase.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+/** Integration tests for Array type support in Flink 2.1. */
+public class Flink21ComplexTypeITCase extends FlinkComplexTypeITCase {}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 3256b3f52..1517fd0d5 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -190,7 +190,9 @@ public class FlinkConversions {
         // now, build Fluss's table
         Schema.Builder schemBuilder = Schema.newBuilder();
         if (resolvedSchema.getPrimaryKey().isPresent()) {
-            
schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns());
+            List<String> primaryKeyColumns = 
resolvedSchema.getPrimaryKey().get().getColumns();
+            FlinkSchemaValidator.validatePrimaryKeyColumns(resolvedSchema, 
primaryKeyColumns);
+            schemBuilder.primaryKey(primaryKeyColumns);
         }
 
         // first build schema with physical columns
@@ -223,6 +225,8 @@ public class FlinkConversions {
                         ? ((ResolvedCatalogTable) 
catalogBaseTable).getPartitionKeys()
                         : ((ResolvedCatalogMaterializedTable) 
catalogBaseTable).getPartitionKeys();
 
+        FlinkSchemaValidator.validatePartitionKeyColumns(resolvedSchema, 
partitionKeys);
+
         Map<String, String> customProperties = flinkTableConf.toMap();
         CatalogPropertiesUtils.serializeComputedColumns(
                 customProperties, resolvedSchema.getColumns());
@@ -253,6 +257,8 @@ public class FlinkConversions {
                     Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(","))
                             .map(String::trim)
                             .collect(Collectors.toList());
+
+            FlinkSchemaValidator.validateBucketKeyColumns(resolvedSchema, 
bucketKey);
         } else {
             // use primary keys - partition keys
             bucketKey =
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java
new file mode 100644
index 000000000..68cbb1b65
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+
+/** Validator for Flink schema constraints. */
+public class FlinkSchemaValidator {
+
+    private FlinkSchemaValidator() {}
+
+    /**
+     * Validates that primary key columns do not contain ARRAY type.
+     *
+     * @param resolvedSchema the resolved schema
+     * @param primaryKeyColumns the list of primary key column names
+     * @throws CatalogException if a primary key column is not found in schema
+     * @throws UnsupportedOperationException if a primary key column is of 
ARRAY type
+     */
+    public static void validatePrimaryKeyColumns(
+            ResolvedSchema resolvedSchema, List<String> primaryKeyColumns) {
+        for (String pkColumn : primaryKeyColumns) {
+            Column column =
+                    resolvedSchema
+                            .getColumn(pkColumn)
+                            .orElseThrow(
+                                    () ->
+                                            new CatalogException(
+                                                    "Primary key column "
+                                                            + pkColumn
+                                                            + " not found in 
schema."));
+            LogicalType logicalType = column.getDataType().getLogicalType();
+            if (logicalType instanceof ArrayType) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Column '%s' of ARRAY type is not supported as 
primary key.",
+                                pkColumn));
+            }
+        }
+    }
+
+    /**
+     * Validates that partition key columns do not contain ARRAY type.
+     *
+     * @param resolvedSchema the resolved schema
+     * @param partitionKeyColumns the list of partition key column names
+     * @throws CatalogException if a partition key column is not found in 
schema
+     * @throws UnsupportedOperationException if a partition key column is of 
ARRAY type
+     */
+    public static void validatePartitionKeyColumns(
+            ResolvedSchema resolvedSchema, List<String> partitionKeyColumns) {
+        for (String partitionKey : partitionKeyColumns) {
+            Column column =
+                    resolvedSchema
+                            .getColumn(partitionKey)
+                            .orElseThrow(
+                                    () ->
+                                            new CatalogException(
+                                                    "Partition key column "
+                                                            + partitionKey
+                                                            + " not found in 
schema."));
+            LogicalType logicalType = column.getDataType().getLogicalType();
+            if (logicalType instanceof ArrayType) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Column '%s' of ARRAY type is not supported as 
partition key.",
+                                partitionKey));
+            }
+        }
+    }
+
+    /**
+     * Validates that bucket key columns do not contain ARRAY type.
+     *
+     * @param resolvedSchema the resolved schema
+     * @param bucketKeyColumns the list of bucket key column names
+     * @throws CatalogException if a bucket key column is not found in schema
+     * @throws UnsupportedOperationException if a bucket key column is of 
ARRAY type
+     */
+    public static void validateBucketKeyColumns(
+            ResolvedSchema resolvedSchema, List<String> bucketKeyColumns) {
+        for (String bkColumn : bucketKeyColumns) {
+            Column column =
+                    resolvedSchema
+                            .getColumn(bkColumn)
+                            .orElseThrow(
+                                    () ->
+                                            new CatalogException(
+                                                    "Bucket key column "
+                                                            + bkColumn
+                                                            + " not found in 
schema."));
+            LogicalType logicalType = column.getDataType().getLogicalType();
+            if (logicalType instanceof ArrayType) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Column '%s' of ARRAY type is not supported as 
bucket key.",
+                                bkColumn));
+            }
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
new file mode 100644
index 000000000..13d91bcd9
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.sink;
+
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Integration tests for Array type support in Flink connector. */
+abstract class FlinkComplexTypeITCase extends AbstractTestBase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder().setNumOfTabletServers(3).build();
+
+    static final String CATALOG_NAME = "testcatalog";
+    static final String DEFAULT_DB = "defaultdb";
+
+    protected StreamExecutionEnvironment env;
+    protected StreamTableEnvironment tEnv;
+    protected TableEnvironment tBatchEnv;
+
+    @BeforeEach
+    void before() {
+        String bootstrapServers = 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
+
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
+        tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tEnv.executeSql("use catalog " + CATALOG_NAME);
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 2);
+
+        tBatchEnv =
+                
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tBatchEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        tBatchEnv.executeSql("use catalog " + CATALOG_NAME);
+        tBatchEnv
+                .getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+
+        tEnv.executeSql("create database " + DEFAULT_DB);
+        tEnv.useDatabase(DEFAULT_DB);
+        tBatchEnv.useDatabase(DEFAULT_DB);
+    }
+
+    @AfterEach
+    void after() {
+        tEnv.useDatabase(BUILTIN_DATABASE);
+        tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
+    }
+
+    @Test
+    void testArrayTypesInLogTable() throws Exception {
+        tEnv.executeSql(
+                "create table array_log_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "bigint_array array<bigint>, "
+                        + "float_array array<float>, "
+                        + "double_array array<double>, "
+                        + "string_array array<string>, "
+                        + "boolean_array array<boolean>, "
+                        + "nested_int_array array<array<int>>, "
+                        + "nested_string_array array<array<string>>, "
+                        + "deeply_nested_array array<array<array<int>>>"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_log_test VALUES "
+                                + "(1, ARRAY[1, 2, CAST(NULL AS INT)], 
ARRAY[100, CAST(NULL AS BIGINT), 300], "
+                                + "ARRAY[CAST(1.1 AS FLOAT), CAST(NULL AS 
FLOAT)], ARRAY[2.2, 3.3, CAST(NULL AS DOUBLE)], "
+                                + "ARRAY['a', CAST(NULL AS STRING), 'c'], 
ARRAY[true, CAST(NULL AS BOOLEAN), false], "
+                                + "ARRAY[ARRAY[1, 2], CAST(NULL AS 
ARRAY<INT>), ARRAY[3]], "
+                                + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS 
STRING), 'y']], "
+                                + "ARRAY[ARRAY[ARRAY[1, 2]], ARRAY[ARRAY[3, 4, 
5]]]), "
+                                + "(2, CAST(NULL AS ARRAY<INT>), ARRAY[400, 
500], "
+                                + "ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], "
+                                + "ARRAY['d', 'e'], ARRAY[true], "
+                                + "ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']], "
+                                + "ARRAY[ARRAY[ARRAY[9]]])")
+                .await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
array_log_test").collect();
+        List<String> expectedRows =
+                Arrays.asList(
+                        "+I[1, [1, 2, null], [100, null, 300], [1.1, null], 
[2.2, 3.3, null], [a, null, c], [true, null, false], [[1, 2], null, [3]], [[x], 
[null, y]], [[[1, 2]], [[3, 4, 5]]]]",
+                        "+I[2, null, [400, 500], [4.4], [5.5], [d, e], [true], 
[[6, 7, 8]], [[z]], [[[9]]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+    }
+
+    @Test
+    void testArrayTypesInPrimaryKeyTable() throws Exception {
+        tEnv.executeSql(
+                "create table array_pk_test ("
+                        + "id int, "
+                        + "int_array array<int>, "
+                        + "bigint_array array<bigint>, "
+                        + "float_array array<float>, "
+                        + "double_array array<double>, "
+                        + "string_array array<string>, "
+                        + "boolean_array array<boolean>, "
+                        + "nested_int_array array<array<int>>, "
+                        + "nested_string_array array<array<string>>, "
+                        + "primary key(id) not enforced"
+                        + ") with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_pk_test VALUES "
+                                + "(1, ARRAY[1, 2], ARRAY[100, 300], 
ARRAY[CAST(1.1 AS FLOAT)], ARRAY[2.2, 3.3], "
+                                + "ARRAY['a', CAST(NULL AS STRING), 'c'], 
ARRAY[true, false], "
+                                + "ARRAY[ARRAY[1, 2], CAST(NULL AS 
ARRAY<INT>), ARRAY[3]], "
+                                + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS 
STRING), 'y']]), "
+                                + "(2, CAST(NULL AS ARRAY<INT>), ARRAY[400, 
500], ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], "
+                                + "ARRAY['d', 'e'], ARRAY[true], 
ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']]), "
+                                + "(3, ARRAY[10], ARRAY[600], ARRAY[CAST(7.7 
AS FLOAT)], ARRAY[8.8], "
+                                + "ARRAY['f'], ARRAY[false], ARRAY[ARRAY[9]], 
ARRAY[ARRAY['w']])")
+                .await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
array_pk_test").collect();
+        List<String> expectedRows =
+                Arrays.asList(
+                        "+I[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, 
null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]",
+                        "+I[2, null, [400, 500], [4.4], [5.5], [d, e], [true], 
[[6, 7, 8]], [[z]]]",
+                        "+I[3, [10], [600], [7.7], [8.8], [f], [false], [[9]], 
[[w]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+        tEnv.executeSql(
+                        "INSERT INTO array_pk_test VALUES "
+                                + "(1, ARRAY[100, 200], ARRAY[1000], 
ARRAY[CAST(10.1 AS FLOAT)], ARRAY[11.1], "
+                                + "ARRAY['updated'], ARRAY[false], 
ARRAY[ARRAY[100]], ARRAY[ARRAY['updated']]), "
+                                + "(4, ARRAY[20, 30], ARRAY[2000, 3000], 
ARRAY[CAST(20.2 AS FLOAT)], ARRAY[30.3], "
+                                + "ARRAY['new'], ARRAY[true], 
ARRAY[ARRAY[200], ARRAY[300]], ARRAY[ARRAY['new1'], ARRAY['new2']])")
+                .await();
+
+        expectedRows =
+                Arrays.asList(
+                        "-U[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, 
null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]",
+                        "+U[1, [100, 200], [1000], [10.1], [11.1], [updated], 
[false], [[100]], [[updated]]]",
+                        "+I[4, [20, 30], [2000, 3000], [20.2], [30.3], [new], 
[true], [[200], [300]], [[new1], [new2]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+    }
+
+    @Test
+    void testArrayTypeAsPartitionKeyThrowsException() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "create table array_partition_test ("
+                                                + "id int, "
+                                                + "data string, "
+                                                + "tags array<string>, "
+                                                + "primary key(id) not 
enforced"
+                                                + ") partitioned by (tags)"))
+                .cause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("is not supported");
+    }
+
+    @Test
+    void testArrayTypeAsPrimaryKeyThrowsException() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "create table array_pk_invalid ("
+                                                + "id int, "
+                                                + "data array<string>, "
+                                                + "primary key(data) not 
enforced"
+                                                + ")"))
+                .cause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("is not supported");
+    }
+
+    @Test
+    void testArrayTypeAsBucketKeyThrowsException() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        "create table array_bucket_test ("
+                                                + "id int, "
+                                                + "data array<string>, "
+                                                + "primary key(id) not 
enforced"
+                                                + ") with ('bucket.key' = 
'data', 'bucket.num' = '3')"))
+                .cause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("is not supported");
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java
new file mode 100644
index 000000000..50e9f91b6
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link FlinkSchemaValidator}. */
+public class FlinkSchemaValidatorTest {
+
+    @Test
+    void testValidatePrimaryKeyColumnsWithValidTypes() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("name", STRING())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("PK_id", 
Collections.singletonList("id")));
+
+        List<String> primaryKeyColumns = Collections.singletonList("id");
+        FlinkSchemaValidator.validatePrimaryKeyColumns(schema, 
primaryKeyColumns);
+    }
+
+    @Test
+    void testValidatePrimaryKeyColumnsWithArrayType() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", ARRAY(INT()).notNull()),
+                                Column.physical("name", STRING())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("PK_id", 
Collections.singletonList("id")));
+
+        List<String> primaryKeyColumns = Collections.singletonList("id");
+        assertThatThrownBy(
+                        () ->
+                                FlinkSchemaValidator.validatePrimaryKeyColumns(
+                                        schema, primaryKeyColumns))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Column 'id' of ARRAY type is not 
supported as primary key.");
+    }
+
+    @Test
+    void testValidatePrimaryKeyColumnsWithMissingColumn() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Collections.singletonList(Column.physical("name", 
STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> primaryKeyColumns = Collections.singletonList("id");
+        assertThatThrownBy(
+                        () ->
+                                FlinkSchemaValidator.validatePrimaryKeyColumns(
+                                        schema, primaryKeyColumns))
+                .isInstanceOf(CatalogException.class)
+                .hasMessageContaining("Primary key column id not found in 
schema.");
+    }
+
+    @Test
+    void testValidatePartitionKeyColumnsWithValidTypes() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("name", STRING()),
+                                Column.physical("date", STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> partitionKeyColumns = Arrays.asList("name", "date");
+        FlinkSchemaValidator.validatePartitionKeyColumns(schema, 
partitionKeyColumns);
+    }
+
+    @Test
+    void testValidatePartitionKeyColumnsWithArrayType() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("tags", ARRAY(STRING())),
+                                Column.physical("name", STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> partitionKeyColumns = Collections.singletonList("tags");
+        assertThatThrownBy(
+                        () ->
+                                
FlinkSchemaValidator.validatePartitionKeyColumns(
+                                        schema, partitionKeyColumns))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining(
+                        "Column 'tags' of ARRAY type is not supported as 
partition key.");
+    }
+
+    @Test
+    void testValidatePartitionKeyColumnsWithMissingColumn() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Collections.singletonList(Column.physical("name", 
STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> partitionKeyColumns = Collections.singletonList("date");
+        assertThatThrownBy(
+                        () ->
+                                
FlinkSchemaValidator.validatePartitionKeyColumns(
+                                        schema, partitionKeyColumns))
+                .isInstanceOf(CatalogException.class)
+                .hasMessageContaining("Partition key column date not found in 
schema.");
+    }
+
+    @Test
+    void testValidateBucketKeyColumnsWithValidTypes() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("name", STRING()),
+                                Column.physical("category", STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> bucketKeyColumns = Arrays.asList("id", "name");
+        FlinkSchemaValidator.validateBucketKeyColumns(schema, 
bucketKeyColumns);
+    }
+
+    @Test
+    void testValidateBucketKeyColumnsWithArrayType() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("tags", ARRAY(STRING())),
+                                Column.physical("name", STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> bucketKeyColumns = Collections.singletonList("tags");
+        assertThatThrownBy(
+                        () ->
+                                FlinkSchemaValidator.validateBucketKeyColumns(
+                                        schema, bucketKeyColumns))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining(
+                        "Column 'tags' of ARRAY type is not supported as 
bucket key.");
+    }
+
+    @Test
+    void testValidateBucketKeyColumnsWithMissingColumn() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Collections.singletonList(Column.physical("name", 
STRING())),
+                        Collections.emptyList(),
+                        null);
+
+        List<String> bucketKeyColumns = 
Collections.singletonList("bucket_col");
+        assertThatThrownBy(
+                        () ->
+                                FlinkSchemaValidator.validateBucketKeyColumns(
+                                        schema, bucketKeyColumns))
+                .isInstanceOf(CatalogException.class)
+                .hasMessageContaining("Bucket key column bucket_col not found 
in schema.");
+    }
+
+    @Test
+    void testValidatePrimaryKeyColumnsWithMultipleColumns() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("name", STRING().notNull()),
+                                Column.physical("tags", ARRAY(STRING()))),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("PK_id_name", 
Arrays.asList("id", "name")));
+
+        List<String> primaryKeyColumns = Arrays.asList("id", "name");
+        FlinkSchemaValidator.validatePrimaryKeyColumns(schema, 
primaryKeyColumns);
+    }
+
+    @Test
+    void testValidatePrimaryKeyColumnsWithMultipleColumnsOneArray() {
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("id", INT().notNull()),
+                                Column.physical("name", STRING().notNull()),
+                                Column.physical("tags", 
ARRAY(STRING()).notNull())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey("PK_id_tags", 
Arrays.asList("id", "tags")));
+
+        List<String> primaryKeyColumns = Arrays.asList("id", "tags");
+        assertThatThrownBy(
+                        () ->
+                                FlinkSchemaValidator.validatePrimaryKeyColumns(
+                                        schema, primaryKeyColumns))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining(
+                        "Column 'tags' of ARRAY type is not supported as 
primary key.");
+    }
+}

Reply via email to