This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch ci-array-flink
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 3986379efbff70a3f1b83222c4d08138837847a3
Author: Jark Wu <[email protected]>
AuthorDate: Mon Dec 1 22:47:46 2025 +0800

    [flink] Remove unnecessary fixVectorBuffers and add more ArrayType IT cases
---
 .../apache/fluss/record/DefaultLogRecordBatch.java |  27 +--
 .../org/apache/fluss/record/LogRecordBatch.java    |   8 -
 .../apache/fluss/record/LogRecordReadContext.java  |  26 ---
 .../org/apache/fluss/row/arrow/ArrowReader.java    |  16 +-
 .../org/apache/fluss/row/arrow/ArrowWriter.java    |  12 +-
 .../fluss/row/arrow/writers/ArrowArrayWriter.java  |  21 --
 .../java/org/apache/fluss/utils/ArrowUtils.java    |  93 +--------
 .../fluss/row/arrow/ArrowReaderWriterTest.java     |   8 +-
 .../apache/fluss/flink/utils/FlinkConversions.java |   8 +-
 .../fluss/flink/utils/FlinkSchemaValidator.java    | 122 -----------
 .../fluss/flink/sink/FlinkComplexTypeITCase.java   | 137 +++++++++---
 .../flink/utils/FlinkSchemaValidatorTest.java      | 229 ---------------------
 .../server/utils/TableDescriptorValidation.java    |  41 +++-
 13 files changed, 166 insertions(+), 582 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
index 2c6bce911..178ce8977 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java
@@ -223,8 +223,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
                         rowType,
                         context.getVectorSchemaRoot(schemaId),
                         context.getBufferAllocator(),
-                        timestamp,
-                        context);
+                        timestamp);
             case INDEXED:
                 return rowRecordIterator(rowType, timestamp);
             default:
@@ -280,11 +279,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
     }
 
     private CloseableIterator<LogRecord> columnRecordIterator(
-            RowType rowType,
-            VectorSchemaRoot root,
-            BufferAllocator allocator,
-            long timestamp,
-            ReadContext readContext) {
+            RowType rowType, VectorSchemaRoot root, BufferAllocator allocator, 
long timestamp) {
         boolean isAppendOnly = (attributes() & APPEND_ONLY_FLAG_MASK) > 0;
         if (isAppendOnly) {
             // append only batch, no change type vector,
@@ -294,13 +289,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
             int arrowLength = sizeInBytes() - recordBatchHeaderSize;
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
-                            segment,
-                            arrowOffset,
-                            arrowLength,
-                            root,
-                            allocator,
-                            rowType,
-                            readContext);
+                            segment, arrowOffset, arrowLength, root, 
allocator, rowType);
             return new ArrowLogRecordIterator(reader, timestamp) {
                 @Override
                 protected ChangeType getChangeType(int rowId) {
@@ -318,13 +307,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
                     sizeInBytes() - arrowChangeTypeOffset(magic) - 
changeTypeVector.sizeInBytes();
             ArrowReader reader =
                     ArrowUtils.createArrowReader(
-                            segment,
-                            arrowOffset,
-                            arrowLength,
-                            root,
-                            allocator,
-                            rowType,
-                            readContext);
+                            segment, arrowOffset, arrowLength, root, 
allocator, rowType);
             return new ArrowLogRecordIterator(reader, timestamp) {
                 @Override
                 protected ChangeType getChangeType(int rowId) {
@@ -371,7 +354,7 @@ public class DefaultLogRecordBatch implements 
LogRecordBatch {
 
         @Override
         public void close() {
-            reader.close();
+            // reader has no resources to release
         }
     }
 
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
index c226f25a4..cadc1ef9d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
+++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java
@@ -195,13 +195,5 @@ public interface LogRecordBatch {
 
         /** Gets the buffer allocator. */
         BufferAllocator getBufferAllocator();
-
-        /**
-         * Registers a batch VectorSchemaRoot to be closed when this context 
is closed. This is used
-         * to track VectorSchemaRoots created for individual batches.
-         *
-         * @param batchRoot the batch VectorSchemaRoot to register
-         */
-        void registerBatchRoot(VectorSchemaRoot batchRoot);
     }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
index 08ef69037..1158dbc35 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
@@ -32,7 +32,6 @@ import org.apache.fluss.utils.Projection;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.IntStream;
 
@@ -56,8 +55,6 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
     private final FieldGetter[] selectedFieldGetters;
     // whether the projection is push downed to the server side and the 
returned data is pruned.
     private final boolean projectionPushDowned;
-    // track all batch VectorSchemaRoots created for this context
-    private final List<VectorSchemaRoot> batchRoots;
 
     /**
      * Creates a LogRecordReadContext for the given table information and 
projection information.
@@ -167,7 +164,6 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         this.bufferAllocator = bufferAllocator;
         this.selectedFieldGetters = selectedFieldGetters;
         this.projectionPushDowned = projectionPushDowned;
-        this.batchRoots = new ArrayList<>();
     }
 
     @Override
@@ -219,29 +215,7 @@ public class LogRecordReadContext implements 
LogRecordBatch.ReadContext, AutoClo
         return bufferAllocator;
     }
 
-    /**
-     * Registers a batch VectorSchemaRoot to be closed when this context is 
closed. This is used to
-     * track VectorSchemaRoots created for individual batches.
-     */
-    @Override
-    public void registerBatchRoot(VectorSchemaRoot batchRoot) {
-        if (batchRoot != null) {
-            batchRoots.add(batchRoot);
-        }
-    }
-
     public void close() {
-        // Close all batch roots first
-        for (VectorSchemaRoot root : batchRoots) {
-            try {
-                root.close();
-            } catch (Exception e) {
-                // Continue closing other roots even if one fails
-            }
-        }
-        batchRoots.clear();
-
-        // Then close the shared schema root and allocator
         if (vectorSchemaRoot != null) {
             vectorSchemaRoot.close();
         }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java
index d50eb8fe0..36a37b44d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java
@@ -22,7 +22,6 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.columnar.ColumnVector;
 import org.apache.fluss.row.columnar.ColumnarRow;
 import org.apache.fluss.row.columnar.VectorizedColumnBatch;
-import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
 
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
@@ -30,11 +29,6 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
 @Internal
 public class ArrowReader {
 
-    /**
-     * The arrow root which holds vector resources and should be released when 
the reader is closed.
-     */
-    private final VectorSchemaRoot root;
-
     /**
      * An array of vectors which are responsible for the deserialization of 
each column of the rows.
      */
@@ -42,10 +36,9 @@ public class ArrowReader {
 
     private final int rowCount;
 
-    public ArrowReader(VectorSchemaRoot root, ColumnVector[] columnVectors) {
-        this.root = root;
+    public ArrowReader(ColumnVector[] columnVectors, int rowCount) {
         this.columnVectors = checkNotNull(columnVectors);
-        this.rowCount = root.getRowCount();
+        this.rowCount = rowCount;
     }
 
     public int getRowCount() {
@@ -56,9 +49,4 @@ public class ArrowReader {
     public ColumnarRow read(int rowId) {
         return new ColumnarRow(new VectorizedColumnBatch(columnVectors), 
rowId);
     }
-
-    public void close() {
-        // Do not close the VectorSchemaRoot here.
-        // The root will be closed when LogRecordReadContext closes.
-    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java
index 8c1ba501b..98b7ed6e5 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java
@@ -88,8 +88,6 @@ public class ArrowWriter implements AutoCloseable {
      */
     private final int metadataLength;
 
-    private final RowType schema;
-
     private final CompressionCodec compressionCodec;
     private final ArrowCompressionRatioEstimator compressionRatioEstimator;
 
@@ -113,7 +111,6 @@ public class ArrowWriter implements AutoCloseable {
             ArrowCompressionInfo compressionInfo,
             ArrowCompressionRatioEstimator compressionRatioEstimator) {
         this.writerKey = writerKey;
-        this.schema = schema;
         this.root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(schema), 
allocator);
         this.provider = checkNotNull(provider);
         this.compressionCodec = compressionInfo.createCompressionCodec();
@@ -185,7 +182,7 @@ public class ArrowWriter implements AutoCloseable {
         }
         // Reset field writers to clear their offset counters (for ArrayWriter)
         for (ArrowFieldWriter fieldWriter : fieldWriters) {
-            resetFieldWriter(fieldWriter);
+            fieldWriter.reset();
         }
         root.setRowCount(0);
         recordsCount = 0;
@@ -291,7 +288,7 @@ public class ArrowWriter implements AutoCloseable {
             recordsCount = 0;
             // Reset array writers when recycling
             for (ArrowFieldWriter fieldWriter : fieldWriters) {
-                resetFieldWriter(fieldWriter);
+                fieldWriter.reset();
             }
             provider.recycleWriter(this);
         }
@@ -322,11 +319,6 @@ public class ArrowWriter implements AutoCloseable {
         }
     }
 
-    /** Resets field writers to clear their state for reuse. */
-    private void resetFieldWriter(ArrowFieldWriter fieldWriter) {
-        fieldWriter.reset();
-    }
-
     private int estimatedBytesWritten(int currentBytes) {
         if (compressionCodec.getCodecType() == 
CompressionUtil.CodecType.NO_COMPRESSION) {
             return currentBytes;
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
index c5001af56..548c4e956 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java
@@ -45,27 +45,6 @@ public class ArrowArrayWriter extends ArrowFieldWriter {
         }
         offset += array.size();
         listVector.endValue(rowIndex, array.size());
-
-        // Fix dataVector valueCount after writing each row
-        // This ensures VectorUnloader has correct valueCount for all vectors
-        FieldVector dataVector = listVector.getDataVector();
-        if (dataVector != null && offset > 0) {
-            dataVector.setValueCount(offset);
-            // Recursively fix nested ListVectors
-            fixNestedListVectorValueCount(dataVector, offset);
-        }
-    }
-
-    private void fixNestedListVectorValueCount(FieldVector vector, int 
parentCount) {
-        if (vector instanceof ListVector && parentCount > 0) {
-            ListVector listVector = (ListVector) vector;
-            int dataVectorValueCount = 
listVector.getElementEndIndex(parentCount - 1);
-            FieldVector dataVector = listVector.getDataVector();
-            if (dataVector != null && dataVectorValueCount > 0) {
-                dataVector.setValueCount(dataVectorValueCount);
-                fixNestedListVectorValueCount(dataVector, 
dataVectorValueCount);
-            }
-        }
     }
 
     /** Resets the offset counter for reuse. */
diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
index 8e9ab3e19..06273fd09 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java
@@ -159,110 +159,29 @@ public class ArrowUtils {
             MemorySegment segment,
             int arrowOffset,
             int arrowLength,
-            VectorSchemaRoot sharedSchemaRoot,
+            VectorSchemaRoot schemaRoot,
             BufferAllocator allocator,
-            RowType rowType,
-            org.apache.fluss.record.LogRecordBatch.ReadContext readContext) {
+            RowType rowType) {
         ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength);
-
         try (ReadChannel channel =
                         new ReadChannel(new 
ByteBufferReadableChannel(arrowBatchBuffer));
                 ArrowRecordBatch batch = deserializeRecordBatch(channel, 
allocator)) {
-
-            // Create a new VectorSchemaRoot for each batch to avoid data 
contamination
-            // when the shared root is reused for subsequent batches
-            VectorSchemaRoot batchSchemaRoot =
-                    VectorSchemaRoot.create(sharedSchemaRoot.getSchema(), 
allocator);
-
             VectorLoader vectorLoader =
-                    new VectorLoader(batchSchemaRoot, 
ArrowCompressionFactory.INSTANCE);
+                    new VectorLoader(schemaRoot, 
ArrowCompressionFactory.INSTANCE);
             vectorLoader.load(batch);
-
-            // Fix buffer writerIndex after VectorLoader.load
-            // VectorLoader.load() sets the capacity but not the writerIndex 
for buffers.
-            // This is especially critical for Array types (ListVector), where 
we need to:
-            // 1. Fix writerIndex for all buffers to match their capacity
-            // 2. Fix nested dataVector's valueCount based on the last offset
-            // This was not needed before Array type support, but is now 
essential
-            // for correct Arrow data reading with complex nested types.
-            fixVectorBuffers(batchSchemaRoot);
-
             List<ColumnVector> columnVectors = new ArrayList<>();
-            List<FieldVector> fieldVectors = batchSchemaRoot.getFieldVectors();
+            List<FieldVector> fieldVectors = schemaRoot.getFieldVectors();
             for (int i = 0; i < fieldVectors.size(); i++) {
                 columnVectors.add(
                         createArrowColumnVector(fieldVectors.get(i), 
rowType.getTypeAt(i)));
             }
-
-            // Register the batch root with the read context so it can be 
closed later
-            if (readContext != null) {
-                readContext.registerBatchRoot(batchSchemaRoot);
-            }
-
-            return new ArrowReader(batchSchemaRoot, columnVectors.toArray(new 
ColumnVector[0]));
+            return new ArrowReader(
+                    columnVectors.toArray(new ColumnVector[0]), 
schemaRoot.getRowCount());
         } catch (IOException e) {
             throw new RuntimeException("Failed to deserialize 
ArrowRecordBatch.", e);
         }
     }
 
-    /**
-     * Fixes writerIndex for all buffers in all vectors after 
VectorLoader.load().
-     *
-     * <p>VectorLoader.load() sets the capacity but not the writerIndex for 
buffers, which can cause
-     * issues when reading data. This method recursively fixes all vectors:
-     *
-     * <ul>
-     *   <li>For all vectors: sets writerIndex to match capacity for proper 
buffer reading
-     *   <li>For ListVector (Array type): additionally fixes the nested 
dataVector's valueCount
-     *       based on the last offset, and recursively fixes nested structures
-     * </ul>
-     *
-     * <p>This is especially critical for Array types introduced in the 
system, where nested
-     * dataVectors need correct valueCount to avoid data corruption or reading 
errors.
-     */
-    private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) {
-        for (FieldVector fieldVector : schemaRoot.getFieldVectors()) {
-            fixVectorBuffersRecursive(fieldVector);
-        }
-    }
-
-    private static void fixVectorBuffersRecursive(FieldVector vector) {
-        // Fix all buffers in this vector
-        for (ArrowBuf buf : vector.getFieldBuffers()) {
-            if (buf.capacity() > 0 && buf.writerIndex() < buf.capacity()) {
-                buf.writerIndex((int) buf.capacity());
-            }
-        }
-
-        // Special handling for ListVector: fix dataVector valueCount and 
buffers
-        // This is critical for Array type support. Without this, the nested 
dataVector
-        // may have incorrect valueCount, leading to data corruption or 
reading errors.
-        if (vector instanceof ListVector) {
-            ListVector listVector = (ListVector) vector;
-            int vectorValueCount = listVector.getValueCount();
-
-            if (vectorValueCount > 0) {
-                FieldVector dataVector = listVector.getDataVector();
-                if (dataVector != null) {
-                    // Calculate the correct valueCount for dataVector from 
the last offset
-                    // For array [a,b,c] followed by [d,e], offsets are 
[0,3,5], so dataVector
-                    // needs valueCount=5 (not the parent's valueCount=2)
-                    int dataVectorValueCount = 
listVector.getElementEndIndex(vectorValueCount - 1);
-                    if (dataVectorValueCount > 0) {
-                        dataVector.setValueCount(dataVectorValueCount);
-                    }
-                    // Recursively fix the dataVector (needed for nested 
arrays)
-                    fixVectorBuffersRecursive(dataVector);
-                }
-            }
-        } else if (vector.getChildrenFromFields() != null) {
-            // Recursively fix other child vectors
-            for (FieldVector child : vector.getChildrenFromFields()) {
-                fixVectorBuffersRecursive(child);
-            }
-        }
-    }
-
     /**
      * Serialize metadata of a {@link ArrowRecordBatch} into write channel. 
This avoids to create an
      * instance of {@link ArrowRecordBatch}.
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
index 295d6272a..16eb4a775 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java
@@ -21,7 +21,6 @@ import org.apache.fluss.memory.AbstractPagedOutputView;
 import org.apache.fluss.memory.ManagedPagedOutputView;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.TestingMemorySegmentPool;
-import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericArray;
 import org.apache.fluss.row.GenericRow;
@@ -178,9 +177,7 @@ class ArrowReaderWriterTest {
                 ArrowWriterPool provider = new ArrowWriterPool(allocator);
                 ArrowWriter writer =
                         provider.getOrCreateWriter(
-                                1L, 1, Integer.MAX_VALUE, rowType, 
NO_COMPRESSION);
-                LogRecordReadContext readContext =
-                        LogRecordReadContext.createArrowReadContext(rowType, 
0)) {
+                                1L, 1, Integer.MAX_VALUE, rowType, 
NO_COMPRESSION)) {
             for (InternalRow row : TEST_DATA) {
                 writer.writeRow(row);
             }
@@ -200,8 +197,7 @@ class ArrowReaderWriterTest {
             
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, 
size);
 
             ArrowReader reader =
-                    ArrowUtils.createArrowReader(
-                            segment, 0, size, root, allocator, rowType, 
readContext);
+                    ArrowUtils.createArrowReader(segment, 0, size, root, 
allocator, rowType);
             int rowCount = reader.getRowCount();
             for (int i = 0; i < rowCount; i++) {
                 ColumnarRow row = reader.read(i);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index 1517fd0d5..3256b3f52 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -190,9 +190,7 @@ public class FlinkConversions {
         // now, build Fluss's table
         Schema.Builder schemBuilder = Schema.newBuilder();
         if (resolvedSchema.getPrimaryKey().isPresent()) {
-            List<String> primaryKeyColumns = 
resolvedSchema.getPrimaryKey().get().getColumns();
-            FlinkSchemaValidator.validatePrimaryKeyColumns(resolvedSchema, 
primaryKeyColumns);
-            schemBuilder.primaryKey(primaryKeyColumns);
+            
schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns());
         }
 
         // first build schema with physical columns
@@ -225,8 +223,6 @@ public class FlinkConversions {
                         ? ((ResolvedCatalogTable) 
catalogBaseTable).getPartitionKeys()
                         : ((ResolvedCatalogMaterializedTable) 
catalogBaseTable).getPartitionKeys();
 
-        FlinkSchemaValidator.validatePartitionKeyColumns(resolvedSchema, 
partitionKeys);
-
         Map<String, String> customProperties = flinkTableConf.toMap();
         CatalogPropertiesUtils.serializeComputedColumns(
                 customProperties, resolvedSchema.getColumns());
@@ -257,8 +253,6 @@ public class FlinkConversions {
                     Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(","))
                             .map(String::trim)
                             .collect(Collectors.toList());
-
-            FlinkSchemaValidator.validateBucketKeyColumns(resolvedSchema, 
bucketKey);
         } else {
             // use primary keys - partition keys
             bucketKey =
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java
deleted file mode 100644
index 68cbb1b65..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.utils;
-
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import java.util.List;
-
-/** Validator for Flink schema constraints. */
-public class FlinkSchemaValidator {
-
-    private FlinkSchemaValidator() {}
-
-    /**
-     * Validates that primary key columns do not contain ARRAY type.
-     *
-     * @param resolvedSchema the resolved schema
-     * @param primaryKeyColumns the list of primary key column names
-     * @throws CatalogException if a primary key column is not found in schema
-     * @throws UnsupportedOperationException if a primary key column is of 
ARRAY type
-     */
-    public static void validatePrimaryKeyColumns(
-            ResolvedSchema resolvedSchema, List<String> primaryKeyColumns) {
-        for (String pkColumn : primaryKeyColumns) {
-            Column column =
-                    resolvedSchema
-                            .getColumn(pkColumn)
-                            .orElseThrow(
-                                    () ->
-                                            new CatalogException(
-                                                    "Primary key column "
-                                                            + pkColumn
-                                                            + " not found in 
schema."));
-            LogicalType logicalType = column.getDataType().getLogicalType();
-            if (logicalType instanceof ArrayType) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Column '%s' of ARRAY type is not supported as 
primary key.",
-                                pkColumn));
-            }
-        }
-    }
-
-    /**
-     * Validates that partition key columns do not contain ARRAY type.
-     *
-     * @param resolvedSchema the resolved schema
-     * @param partitionKeyColumns the list of partition key column names
-     * @throws CatalogException if a partition key column is not found in 
schema
-     * @throws UnsupportedOperationException if a partition key column is of 
ARRAY type
-     */
-    public static void validatePartitionKeyColumns(
-            ResolvedSchema resolvedSchema, List<String> partitionKeyColumns) {
-        for (String partitionKey : partitionKeyColumns) {
-            Column column =
-                    resolvedSchema
-                            .getColumn(partitionKey)
-                            .orElseThrow(
-                                    () ->
-                                            new CatalogException(
-                                                    "Partition key column "
-                                                            + partitionKey
-                                                            + " not found in 
schema."));
-            LogicalType logicalType = column.getDataType().getLogicalType();
-            if (logicalType instanceof ArrayType) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Column '%s' of ARRAY type is not supported as 
partition key.",
-                                partitionKey));
-            }
-        }
-    }
-
-    /**
-     * Validates that bucket key columns do not contain ARRAY type.
-     *
-     * @param resolvedSchema the resolved schema
-     * @param bucketKeyColumns the list of bucket key column names
-     * @throws CatalogException if a bucket key column is not found in schema
-     * @throws UnsupportedOperationException if a bucket key column is of 
ARRAY type
-     */
-    public static void validateBucketKeyColumns(
-            ResolvedSchema resolvedSchema, List<String> bucketKeyColumns) {
-        for (String bkColumn : bucketKeyColumns) {
-            Column column =
-                    resolvedSchema
-                            .getColumn(bkColumn)
-                            .orElseThrow(
-                                    () ->
-                                            new CatalogException(
-                                                    "Bucket key column "
-                                                            + bkColumn
-                                                            + " not found in 
schema."));
-            LogicalType logicalType = column.getDataType().getLogicalType();
-            if (logicalType instanceof ArrayType) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Column '%s' of ARRAY type is not supported as 
bucket key.",
-                                bkColumn));
-            }
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
index 13d91bcd9..efa408a7f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java
@@ -17,11 +17,18 @@
 
 package org.apache.fluss.flink.sink;
 
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -131,6 +138,47 @@ abstract class FlinkComplexTypeITCase extends 
AbstractTestBase {
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
+    @Test
+    void testArrayTypesInPartitionedLogTable() throws Exception {
+        tEnv.executeSql(
+                "create table array_log_test ("
+                        + "id int, "
+                        + "dt string, "
+                        + "int_array array<int>, "
+                        + "bigint_array array<bigint>, "
+                        + "float_array array<float>, "
+                        + "double_array array<double>, "
+                        + "string_array array<string>, "
+                        + "boolean_array array<boolean>, "
+                        + "nested_int_array array<array<int>>, "
+                        + "nested_string_array array<array<string>>, "
+                        + "deeply_nested_array array<array<array<int>>>"
+                        + ") PARTITIONED BY (dt) "
+                        + "with ('bucket.num' = '3')");
+
+        tEnv.executeSql(
+                        "INSERT INTO array_log_test VALUES "
+                                + "(1, '2014', ARRAY[1, 2, CAST(NULL AS INT)], 
ARRAY[100, CAST(NULL AS BIGINT), 300], "
+                                + "ARRAY[CAST(1.1 AS FLOAT), CAST(NULL AS 
FLOAT)], ARRAY[2.2, 3.3, CAST(NULL AS DOUBLE)], "
+                                + "ARRAY['a', CAST(NULL AS STRING), 'c'], 
ARRAY[true, CAST(NULL AS BOOLEAN), false], "
+                                + "ARRAY[ARRAY[1, 2], CAST(NULL AS 
ARRAY<INT>), ARRAY[3]], "
+                                + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS 
STRING), 'y']], "
+                                + "ARRAY[ARRAY[ARRAY[1, 2]], ARRAY[ARRAY[3, 4, 
5]]]), "
+                                + "(2, '2013', CAST(NULL AS ARRAY<INT>), 
ARRAY[400, 500], "
+                                + "ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], "
+                                + "ARRAY['d', 'e'], ARRAY[true], "
+                                + "ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']], "
+                                + "ARRAY[ARRAY[ARRAY[9]]])")
+                .await();
+
+        CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
array_log_test").collect();
+        List<String> expectedRows =
+                Arrays.asList(
+                        "+I[1, 2014, [1, 2, null], [100, null, 300], [1.1, 
null], [2.2, 3.3, null], [a, null, c], [true, null, false], [[1, 2], null, 
[3]], [[x], [null, y]], [[[1, 2]], [[3, 4, 5]]]]",
+                        "+I[2, 2013, null, [400, 500], [4.4], [5.5], [d, e], 
[true], [[6, 7, 8]], [[z]], [[[9]]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, true);
+    }
+
     @Test
     void testArrayTypesInPrimaryKeyTable() throws Exception {
         tEnv.executeSql(
@@ -180,11 +228,58 @@ abstract class FlinkComplexTypeITCase extends 
AbstractTestBase {
                         "-U[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, 
null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]",
                         "+U[1, [100, 200], [1000], [10.1], [11.1], [updated], 
[false], [[100]], [[updated]]]",
                         "+I[4, [20, 30], [2000, 3000], [20.2], [30.3], [new], 
[true], [[200], [300]], [[new1], [new2]]]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+        // insert into with partial update test
+        tEnv.executeSql(
+                        "INSERT INTO array_pk_test (id, string_array, 
bigint_array) VALUES "
+                                + "(2, ARRAY['partially', 'updated'], 
ARRAY[9999])")
+                .await();
+
+        expectedRows =
+                Arrays.asList(
+                        "-U[2, null, [400, 500], [4.4], [5.5], [d, e], [true], 
[[6, 7, 8]], [[z]]]",
+                        "+U[2, null, [9999], [4.4], [5.5], [partially, 
updated], [true], [[6, 7, 8]], [[z]]]");
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
+
+        // test lookup join with array type, test partitioned table
+        Schema srcSchema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("c", DataTypes.INT())
+                        .columnByExpression("proc", "PROCTIME()")
+                        .build();
+        RowTypeInfo srcTestTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {Types.INT, Types.STRING, 
Types.INT},
+                        new String[] {"a", "name", "c"});
+        List<Row> testData =
+                Arrays.asList(
+                        Row.of(1, "name1", 11),
+                        Row.of(2, "name2", 2),
+                        Row.of(3, "name33", 33),
+                        Row.of(10, "name0", 44));
+        DataStream<Row> srcDs = 
env.fromCollection(testData).returns(srcTestTypeInfo);
+        tEnv.dropTemporaryView("src");
+        tEnv.createTemporaryView("src", tEnv.fromDataStream(srcDs, srcSchema));
+        CloseableIterator<Row> collected =
+                tEnv.executeSql(
+                                "SELECT a, name, array_pk_test.* FROM src "
+                                        + "LEFT JOIN array_pk_test FOR 
SYSTEM_TIME AS OF src.proc "
+                                        + "ON src.a = array_pk_test.id")
+                        .collect();
+        List<String> expected =
+                Arrays.asList(
+                        "+I[1, name1, 1, [100, 200], [1000], [10.1], [11.1], 
[updated], [false], [[100]], [[updated]]]",
+                        "+I[2, name2, 2, null, [9999], [4.4], [5.5], 
[partially, updated], [true], [[6, 7, 8]], [[z]]]",
+                        "+I[3, name33, 3, [10], [600], [7.7], [8.8], [f], 
[false], [[9]], [[w]]]",
+                        "+I[10, name0, null, null, null, null, null, null, 
null, null, null]");
+        assertResultsIgnoreOrder(collected, expected, true);
     }
 
     @Test
-    void testArrayTypeAsPartitionKeyThrowsException() {
+    void testExceptionsForArrayTypeUsage() {
         assertThatThrownBy(
                         () ->
                                 tEnv.executeSql(
@@ -192,40 +287,24 @@ abstract class FlinkComplexTypeITCase extends 
AbstractTestBase {
                                                 + "id int, "
                                                 + "data string, "
                                                 + "tags array<string>, "
-                                                + "primary key(id) not 
enforced"
+                                                + "primary key(id, tags) not 
enforced"
                                                 + ") partitioned by (tags)"))
-                .cause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("is not supported");
-    }
+                .hasRootCauseInstanceOf(InvalidTableException.class)
+                .hasRootCauseMessage(
+                        "Primary key column 'tags' has unsupported data type 
ARRAY<STRING> NOT NULL. "
+                                + "Currently, primary key column does not 
support types: [ARRAY, MAP, ROW].");
 
-    @Test
-    void testArrayTypeAsPrimaryKeyThrowsException() {
-        assertThatThrownBy(
-                        () ->
-                                tEnv.executeSql(
-                                        "create table array_pk_invalid ("
-                                                + "id int, "
-                                                + "data array<string>, "
-                                                + "primary key(data) not 
enforced"
-                                                + ")"))
-                .cause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("is not supported");
-    }
-
-    @Test
-    void testArrayTypeAsBucketKeyThrowsException() {
         assertThatThrownBy(
                         () ->
                                 tEnv.executeSql(
                                         "create table array_bucket_test ("
                                                 + "id int, "
-                                                + "data array<string>, "
-                                                + "primary key(id) not 
enforced"
-                                                + ") with ('bucket.key' = 
'data', 'bucket.num' = '3')"))
-                .cause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("is not supported");
+                                                + "data string, "
+                                                + "tags array<string> "
+                                                + ") with ('bucket.key' = 
'tags')"))
+                .hasRootCauseInstanceOf(InvalidTableException.class)
+                .hasRootCauseMessage(
+                        "Bucket key column 'tags' has unsupported data type 
ARRAY<STRING>. "
+                                + "Currently, bucket key column does not 
support types: [ARRAY, MAP, ROW].");
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java
deleted file mode 100644
index 50e9f91b6..000000000
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.utils;
-
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.table.api.DataTypes.ARRAY;
-import static org.apache.flink.table.api.DataTypes.INT;
-import static org.apache.flink.table.api.DataTypes.STRING;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Test for {@link FlinkSchemaValidator}. */
-public class FlinkSchemaValidatorTest {
-
-    @Test
-    void testValidatePrimaryKeyColumnsWithValidTypes() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("name", STRING())),
-                        Collections.emptyList(),
-                        UniqueConstraint.primaryKey("PK_id", 
Collections.singletonList("id")));
-
-        List<String> primaryKeyColumns = Collections.singletonList("id");
-        FlinkSchemaValidator.validatePrimaryKeyColumns(schema, 
primaryKeyColumns);
-    }
-
-    @Test
-    void testValidatePrimaryKeyColumnsWithArrayType() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", ARRAY(INT()).notNull()),
-                                Column.physical("name", STRING())),
-                        Collections.emptyList(),
-                        UniqueConstraint.primaryKey("PK_id", 
Collections.singletonList("id")));
-
-        List<String> primaryKeyColumns = Collections.singletonList("id");
-        assertThatThrownBy(
-                        () ->
-                                FlinkSchemaValidator.validatePrimaryKeyColumns(
-                                        schema, primaryKeyColumns))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Column 'id' of ARRAY type is not 
supported as primary key.");
-    }
-
-    @Test
-    void testValidatePrimaryKeyColumnsWithMissingColumn() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Collections.singletonList(Column.physical("name", 
STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> primaryKeyColumns = Collections.singletonList("id");
-        assertThatThrownBy(
-                        () ->
-                                FlinkSchemaValidator.validatePrimaryKeyColumns(
-                                        schema, primaryKeyColumns))
-                .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Primary key column id not found in 
schema.");
-    }
-
-    @Test
-    void testValidatePartitionKeyColumnsWithValidTypes() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("name", STRING()),
-                                Column.physical("date", STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> partitionKeyColumns = Arrays.asList("name", "date");
-        FlinkSchemaValidator.validatePartitionKeyColumns(schema, 
partitionKeyColumns);
-    }
-
-    @Test
-    void testValidatePartitionKeyColumnsWithArrayType() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("tags", ARRAY(STRING())),
-                                Column.physical("name", STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> partitionKeyColumns = Collections.singletonList("tags");
-        assertThatThrownBy(
-                        () ->
-                                
FlinkSchemaValidator.validatePartitionKeyColumns(
-                                        schema, partitionKeyColumns))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Column 'tags' of ARRAY type is not supported as 
partition key.");
-    }
-
-    @Test
-    void testValidatePartitionKeyColumnsWithMissingColumn() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Collections.singletonList(Column.physical("name", 
STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> partitionKeyColumns = Collections.singletonList("date");
-        assertThatThrownBy(
-                        () ->
-                                
FlinkSchemaValidator.validatePartitionKeyColumns(
-                                        schema, partitionKeyColumns))
-                .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Partition key column date not found in 
schema.");
-    }
-
-    @Test
-    void testValidateBucketKeyColumnsWithValidTypes() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("name", STRING()),
-                                Column.physical("category", STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> bucketKeyColumns = Arrays.asList("id", "name");
-        FlinkSchemaValidator.validateBucketKeyColumns(schema, 
bucketKeyColumns);
-    }
-
-    @Test
-    void testValidateBucketKeyColumnsWithArrayType() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("tags", ARRAY(STRING())),
-                                Column.physical("name", STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> bucketKeyColumns = Collections.singletonList("tags");
-        assertThatThrownBy(
-                        () ->
-                                FlinkSchemaValidator.validateBucketKeyColumns(
-                                        schema, bucketKeyColumns))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Column 'tags' of ARRAY type is not supported as 
bucket key.");
-    }
-
-    @Test
-    void testValidateBucketKeyColumnsWithMissingColumn() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Collections.singletonList(Column.physical("name", 
STRING())),
-                        Collections.emptyList(),
-                        null);
-
-        List<String> bucketKeyColumns = 
Collections.singletonList("bucket_col");
-        assertThatThrownBy(
-                        () ->
-                                FlinkSchemaValidator.validateBucketKeyColumns(
-                                        schema, bucketKeyColumns))
-                .isInstanceOf(CatalogException.class)
-                .hasMessageContaining("Bucket key column bucket_col not found 
in schema.");
-    }
-
-    @Test
-    void testValidatePrimaryKeyColumnsWithMultipleColumns() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("name", STRING().notNull()),
-                                Column.physical("tags", ARRAY(STRING()))),
-                        Collections.emptyList(),
-                        UniqueConstraint.primaryKey("PK_id_name", 
Arrays.asList("id", "name")));
-
-        List<String> primaryKeyColumns = Arrays.asList("id", "name");
-        FlinkSchemaValidator.validatePrimaryKeyColumns(schema, 
primaryKeyColumns);
-    }
-
-    @Test
-    void testValidatePrimaryKeyColumnsWithMultipleColumnsOneArray() {
-        ResolvedSchema schema =
-                new ResolvedSchema(
-                        Arrays.asList(
-                                Column.physical("id", INT().notNull()),
-                                Column.physical("name", STRING().notNull()),
-                                Column.physical("tags", 
ARRAY(STRING()).notNull())),
-                        Collections.emptyList(),
-                        UniqueConstraint.primaryKey("PK_id_tags", 
Arrays.asList("id", "tags")));
-
-        List<String> primaryKeyColumns = Arrays.asList("id", "tags");
-        assertThatThrownBy(
-                        () ->
-                                FlinkSchemaValidator.validatePrimaryKeyColumns(
-                                        schema, primaryKeyColumns))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining(
-                        "Column 'tags' of ARRAY type is not supported as 
primary key.");
-    }
-}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 446e8b505..250fc30da 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -30,6 +30,7 @@ import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.KvFormat;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.MergeEngineType;
+import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.types.DataType;
@@ -66,6 +67,9 @@ public class TableDescriptorValidation {
                                     TIMESTAMP_COLUMN_NAME,
                                     BUCKET_COLUMN_NAME)));
 
+    private static final List<DataTypeRoot> KEY_UNSUPPORTED_TYPES =
+            Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, 
DataTypeRoot.ROW);
+
     /** Validate table descriptor to create is valid and contain all necessary 
information. */
     public static void validateTableDescriptor(TableDescriptor 
tableDescriptor, int maxBucketNum) {
         boolean hasPrimaryKey = 
tableDescriptor.getSchema().getPrimaryKey().isPresent();
@@ -95,7 +99,7 @@ public class TableDescriptorValidation {
 
         // check distribution
         checkDistribution(tableDescriptor, maxBucketNum);
-
+        checkPrimaryKey(tableDescriptor);
         // check individual options
         checkReplicationFactor(tableConf);
         checkLogFormat(tableConf, hasPrimaryKey);
@@ -162,6 +166,41 @@ public class TableDescriptorValidation {
                             "Bucket count %s exceeds the maximum limit %s.",
                             bucketCount, maxBucketNum));
         }
+        List<String> bucketKeys = 
tableDescriptor.getTableDistribution().get().getBucketKeys();
+        if (!bucketKeys.isEmpty()) {
+            // check bucket key type
+            RowType schema = tableDescriptor.getSchema().getRowType();
+            for (String bkColumn : bucketKeys) {
+                int bkIndex = schema.getFieldIndex(bkColumn);
+                DataType bkDataType = schema.getTypeAt(bkIndex);
+                if (KEY_UNSUPPORTED_TYPES.contains(bkDataType.getTypeRoot())) {
+                    throw new InvalidTableException(
+                            String.format(
+                                    "Bucket key column '%s' has unsupported 
data type %s. "
+                                            + "Currently, bucket key column 
does not support types: %s.",
+                                    bkColumn, bkDataType, 
KEY_UNSUPPORTED_TYPES));
+                }
+            }
+        }
+    }
+
+    private static void checkPrimaryKey(TableDescriptor tableDescriptor) {
+        if (tableDescriptor.hasPrimaryKey()) {
+            // check primary key type
+            RowType schema = tableDescriptor.getSchema().getRowType();
+            Schema.PrimaryKey primaryKey = 
tableDescriptor.getSchema().getPrimaryKey().get();
+            for (String pkColumn : primaryKey.getColumnNames()) {
+                int pkIndex = schema.getFieldIndex(pkColumn);
+                DataType pkDataType = schema.getTypeAt(pkIndex);
+                if (KEY_UNSUPPORTED_TYPES.contains(pkDataType.getTypeRoot())) {
+                    throw new InvalidTableException(
+                            String.format(
+                                    "Primary key column '%s' has unsupported 
data type %s. "
+                                            + "Currently, primary key column 
does not support types: %s.",
+                                    pkColumn, pkDataType, 
KEY_UNSUPPORTED_TYPES));
+                }
+            }
+        }
     }
 
     private static void checkReplicationFactor(Configuration tableConf) {

Reply via email to