This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-array-flink in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 3986379efbff70a3f1b83222c4d08138837847a3 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 22:47:46 2025 +0800 [flink] Remove unnecessary fixVectorBuffers and add more ArrayType IT cases --- .../apache/fluss/record/DefaultLogRecordBatch.java | 27 +-- .../org/apache/fluss/record/LogRecordBatch.java | 8 - .../apache/fluss/record/LogRecordReadContext.java | 26 --- .../org/apache/fluss/row/arrow/ArrowReader.java | 16 +- .../org/apache/fluss/row/arrow/ArrowWriter.java | 12 +- .../fluss/row/arrow/writers/ArrowArrayWriter.java | 21 -- .../java/org/apache/fluss/utils/ArrowUtils.java | 93 +-------- .../fluss/row/arrow/ArrowReaderWriterTest.java | 8 +- .../apache/fluss/flink/utils/FlinkConversions.java | 8 +- .../fluss/flink/utils/FlinkSchemaValidator.java | 122 ----------- .../fluss/flink/sink/FlinkComplexTypeITCase.java | 137 +++++++++--- .../flink/utils/FlinkSchemaValidatorTest.java | 229 --------------------- .../server/utils/TableDescriptorValidation.java | 41 +++- 13 files changed, 166 insertions(+), 582 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index 2c6bce911..178ce8977 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -223,8 +223,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { rowType, context.getVectorSchemaRoot(schemaId), context.getBufferAllocator(), - timestamp, - context); + timestamp); case INDEXED: return rowRecordIterator(rowType, timestamp); default: @@ -280,11 +279,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { } private CloseableIterator<LogRecord> columnRecordIterator( - RowType rowType, - VectorSchemaRoot root, - BufferAllocator allocator, - long timestamp, - ReadContext readContext) { + RowType rowType, VectorSchemaRoot root, BufferAllocator allocator, long timestamp) { boolean isAppendOnly = (attributes() & APPEND_ONLY_FLAG_MASK) > 0; if (isAppendOnly) { // append only batch, no change type vector, @@ -294,13 +289,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { int arrowLength = sizeInBytes() - recordBatchHeaderSize; ArrowReader reader = ArrowUtils.createArrowReader( - segment, - arrowOffset, - arrowLength, - root, - allocator, - rowType, - readContext); + segment, arrowOffset, arrowLength, root, allocator, rowType); return new ArrowLogRecordIterator(reader, timestamp) { @Override protected ChangeType getChangeType(int rowId) { @@ -318,13 +307,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { sizeInBytes() - arrowChangeTypeOffset(magic) - changeTypeVector.sizeInBytes(); ArrowReader reader = ArrowUtils.createArrowReader( - segment, - arrowOffset, - arrowLength, - root, - allocator, - rowType, - readContext); + segment, arrowOffset, arrowLength, root, allocator, rowType); return new ArrowLogRecordIterator(reader, timestamp) { @Override protected ChangeType getChangeType(int rowId) { @@ -371,7 +354,7 @@ public class DefaultLogRecordBatch implements LogRecordBatch { @Override public void close() { - reader.close(); + // reader has no resources to release } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java index c226f25a4..cadc1ef9d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java @@ -195,13 +195,5 @@ public interface LogRecordBatch { /** Gets the buffer allocator. */ BufferAllocator getBufferAllocator(); - - /** - * Registers a batch VectorSchemaRoot to be closed when this context is closed. This is used - * to track VectorSchemaRoots created for individual batches. - * - * @param batchRoot the batch VectorSchemaRoot to register - */ - void registerBatchRoot(VectorSchemaRoot batchRoot); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 08ef69037..1158dbc35 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -32,7 +32,6 @@ import org.apache.fluss.utils.Projection; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.List; import java.util.stream.IntStream; @@ -56,8 +55,6 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo private final FieldGetter[] selectedFieldGetters; // whether the projection is push downed to the server side and the returned data is pruned. private final boolean projectionPushDowned; - // track all batch VectorSchemaRoots created for this context - private final List<VectorSchemaRoot> batchRoots; /** * Creates a LogRecordReadContext for the given table information and projection information. @@ -167,7 +164,6 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo this.bufferAllocator = bufferAllocator; this.selectedFieldGetters = selectedFieldGetters; this.projectionPushDowned = projectionPushDowned; - this.batchRoots = new ArrayList<>(); } @Override @@ -219,29 +215,7 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo return bufferAllocator; } - /** - * Registers a batch VectorSchemaRoot to be closed when this context is closed. This is used to - * track VectorSchemaRoots created for individual batches. - */ - @Override - public void registerBatchRoot(VectorSchemaRoot batchRoot) { - if (batchRoot != null) { - batchRoots.add(batchRoot); - } - } - public void close() { - // Close all batch roots first - for (VectorSchemaRoot root : batchRoots) { - try { - root.close(); - } catch (Exception e) { - // Continue closing other roots even if one fails - } - } - batchRoots.clear(); - - // Then close the shared schema root and allocator if (vectorSchemaRoot != null) { vectorSchemaRoot.close(); } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java index d50eb8fe0..36a37b44d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowReader.java @@ -22,7 +22,6 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.columnar.ColumnVector; import org.apache.fluss.row.columnar.ColumnarRow; import org.apache.fluss.row.columnar.VectorizedColumnBatch; -import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -30,11 +29,6 @@ import static org.apache.fluss.utils.Preconditions.checkNotNull; @Internal public class ArrowReader { - /** - * The arrow root which holds vector resources and should be released when the reader is closed. - */ - private final VectorSchemaRoot root; - /** * An array of vectors which are responsible for the deserialization of each column of the rows. */ @@ -42,10 +36,9 @@ public class ArrowReader { private final int rowCount; - public ArrowReader(VectorSchemaRoot root, ColumnVector[] columnVectors) { - this.root = root; + public ArrowReader(ColumnVector[] columnVectors, int rowCount) { this.columnVectors = checkNotNull(columnVectors); - this.rowCount = root.getRowCount(); + this.rowCount = rowCount; } public int getRowCount() { @@ -56,9 +49,4 @@ public class ArrowReader { public ColumnarRow read(int rowId) { return new ColumnarRow(new VectorizedColumnBatch(columnVectors), rowId); } - - public void close() { - // Do not close the VectorSchemaRoot here. - // The root will be closed when LogRecordReadContext closes. - } } diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java index 8c1ba501b..98b7ed6e5 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java @@ -88,8 +88,6 @@ public class ArrowWriter implements AutoCloseable { */ private final int metadataLength; - private final RowType schema; - private final CompressionCodec compressionCodec; private final ArrowCompressionRatioEstimator compressionRatioEstimator; @@ -113,7 +111,6 @@ public class ArrowWriter implements AutoCloseable { ArrowCompressionInfo compressionInfo, ArrowCompressionRatioEstimator compressionRatioEstimator) { this.writerKey = writerKey; - this.schema = schema; this.root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(schema), allocator); this.provider = checkNotNull(provider); this.compressionCodec = compressionInfo.createCompressionCodec(); @@ -185,7 +182,7 @@ public class ArrowWriter implements AutoCloseable { } // Reset field writers to clear their offset counters (for ArrayWriter) for (ArrowFieldWriter fieldWriter : fieldWriters) { - resetFieldWriter(fieldWriter); + fieldWriter.reset(); } root.setRowCount(0); recordsCount = 0; @@ -291,7 +288,7 @@ public class ArrowWriter implements AutoCloseable { recordsCount = 0; // Reset array writers when recycling for (ArrowFieldWriter fieldWriter : fieldWriters) { - resetFieldWriter(fieldWriter); + fieldWriter.reset(); } provider.recycleWriter(this); } @@ -322,11 +319,6 @@ public class ArrowWriter implements AutoCloseable { } } - /** Resets field writers to clear their state for reuse. */ - private void resetFieldWriter(ArrowFieldWriter fieldWriter) { - fieldWriter.reset(); - } - private int estimatedBytesWritten(int currentBytes) { if (compressionCodec.getCodecType() == CompressionUtil.CodecType.NO_COMPRESSION) { return currentBytes; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java index c5001af56..548c4e956 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java @@ -45,27 +45,6 @@ public class ArrowArrayWriter extends ArrowFieldWriter { } offset += array.size(); listVector.endValue(rowIndex, array.size()); - - // Fix dataVector valueCount after writing each row - // This ensures VectorUnloader has correct valueCount for all vectors - FieldVector dataVector = listVector.getDataVector(); - if (dataVector != null && offset > 0) { - dataVector.setValueCount(offset); - // Recursively fix nested ListVectors - fixNestedListVectorValueCount(dataVector, offset); - } - } - - private void fixNestedListVectorValueCount(FieldVector vector, int parentCount) { - if (vector instanceof ListVector && parentCount > 0) { - ListVector listVector = (ListVector) vector; - int dataVectorValueCount = listVector.getElementEndIndex(parentCount - 1); - FieldVector dataVector = listVector.getDataVector(); - if (dataVector != null && dataVectorValueCount > 0) { - dataVector.setValueCount(dataVectorValueCount); - fixNestedListVectorValueCount(dataVector, dataVectorValueCount); - } - } } /** Resets the offset counter for reuse. */ diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java index 8e9ab3e19..06273fd09 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java @@ -159,110 +159,29 @@ public class ArrowUtils { MemorySegment segment, int arrowOffset, int arrowLength, - VectorSchemaRoot sharedSchemaRoot, + VectorSchemaRoot schemaRoot, BufferAllocator allocator, - RowType rowType, - org.apache.fluss.record.LogRecordBatch.ReadContext readContext) { + RowType rowType) { ByteBuffer arrowBatchBuffer = segment.wrap(arrowOffset, arrowLength); - try (ReadChannel channel = new ReadChannel(new ByteBufferReadableChannel(arrowBatchBuffer)); ArrowRecordBatch batch = deserializeRecordBatch(channel, allocator)) { - - // Create a new VectorSchemaRoot for each batch to avoid data contamination - // when the shared root is reused for subsequent batches - VectorSchemaRoot batchSchemaRoot = - VectorSchemaRoot.create(sharedSchemaRoot.getSchema(), allocator); - VectorLoader vectorLoader = - new VectorLoader(batchSchemaRoot, ArrowCompressionFactory.INSTANCE); + new VectorLoader(schemaRoot, ArrowCompressionFactory.INSTANCE); vectorLoader.load(batch); - - // Fix buffer writerIndex after VectorLoader.load - // VectorLoader.load() sets the capacity but not the writerIndex for buffers. - // This is especially critical for Array types (ListVector), where we need to: - // 1. Fix writerIndex for all buffers to match their capacity - // 2. Fix nested dataVector's valueCount based on the last offset - // This was not needed before Array type support, but is now essential - // for correct Arrow data reading with complex nested types. - fixVectorBuffers(batchSchemaRoot); - List<ColumnVector> columnVectors = new ArrayList<>(); - List<FieldVector> fieldVectors = batchSchemaRoot.getFieldVectors(); + List<FieldVector> fieldVectors = schemaRoot.getFieldVectors(); for (int i = 0; i < fieldVectors.size(); i++) { columnVectors.add( createArrowColumnVector(fieldVectors.get(i), rowType.getTypeAt(i))); } - - // Register the batch root with the read context so it can be closed later - if (readContext != null) { - readContext.registerBatchRoot(batchSchemaRoot); - } - - return new ArrowReader(batchSchemaRoot, columnVectors.toArray(new ColumnVector[0])); + return new ArrowReader( + columnVectors.toArray(new ColumnVector[0]), schemaRoot.getRowCount()); } catch (IOException e) { throw new RuntimeException("Failed to deserialize ArrowRecordBatch.", e); } } - /** - * Fixes writerIndex for all buffers in all vectors after VectorLoader.load(). - * - * <p>VectorLoader.load() sets the capacity but not the writerIndex for buffers, which can cause - * issues when reading data. This method recursively fixes all vectors: - * - * <ul> - * <li>For all vectors: sets writerIndex to match capacity for proper buffer reading - * <li>For ListVector (Array type): additionally fixes the nested dataVector's valueCount - * based on the last offset, and recursively fixes nested structures - * </ul> - * - * <p>This is especially critical for Array types introduced in the system, where nested - * dataVectors need correct valueCount to avoid data corruption or reading errors. - */ - private static void fixVectorBuffers(VectorSchemaRoot schemaRoot) { - for (FieldVector fieldVector : schemaRoot.getFieldVectors()) { - fixVectorBuffersRecursive(fieldVector); - } - } - - private static void fixVectorBuffersRecursive(FieldVector vector) { - // Fix all buffers in this vector - for (ArrowBuf buf : vector.getFieldBuffers()) { - if (buf.capacity() > 0 && buf.writerIndex() < buf.capacity()) { - buf.writerIndex((int) buf.capacity()); - } - } - - // Special handling for ListVector: fix dataVector valueCount and buffers - // This is critical for Array type support. Without this, the nested dataVector - // may have incorrect valueCount, leading to data corruption or reading errors. - if (vector instanceof ListVector) { - ListVector listVector = (ListVector) vector; - int vectorValueCount = listVector.getValueCount(); - - if (vectorValueCount > 0) { - FieldVector dataVector = listVector.getDataVector(); - if (dataVector != null) { - // Calculate the correct valueCount for dataVector from the last offset - // For array [a,b,c] followed by [d,e], offsets are [0,3,5], so dataVector - // needs valueCount=5 (not the parent's valueCount=2) - int dataVectorValueCount = listVector.getElementEndIndex(vectorValueCount - 1); - if (dataVectorValueCount > 0) { - dataVector.setValueCount(dataVectorValueCount); - } - // Recursively fix the dataVector (needed for nested arrays) - fixVectorBuffersRecursive(dataVector); - } - } - } else if (vector.getChildrenFromFields() != null) { - // Recursively fix other child vectors - for (FieldVector child : vector.getChildrenFromFields()) { - fixVectorBuffersRecursive(child); - } - } - } - /** * Serialize metadata of a {@link ArrowRecordBatch} into write channel. This avoids to create an * instance of {@link ArrowRecordBatch}. diff --git a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java index 295d6272a..16eb4a775 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java @@ -21,7 +21,6 @@ import org.apache.fluss.memory.AbstractPagedOutputView; import org.apache.fluss.memory.ManagedPagedOutputView; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.memory.TestingMemorySegmentPool; -import org.apache.fluss.record.LogRecordReadContext; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; @@ -178,9 +177,7 @@ class ArrowReaderWriterTest { ArrowWriterPool provider = new ArrowWriterPool(allocator); ArrowWriter writer = provider.getOrCreateWriter( - 1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION); - LogRecordReadContext readContext = - LogRecordReadContext.createArrowReadContext(rowType, 0)) { + 1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) { for (InternalRow row : TEST_DATA) { writer.writeRow(row); } @@ -200,8 +197,7 @@ class ArrowReaderWriterTest { firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size); ArrowReader reader = - ArrowUtils.createArrowReader( - segment, 0, size, root, allocator, rowType, readContext); + ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType); int rowCount = reader.getRowCount(); for (int i = 0; i < rowCount; i++) { ColumnarRow row = reader.read(i); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 1517fd0d5..3256b3f52 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -190,9 +190,7 @@ public class FlinkConversions { // now, build Fluss's table Schema.Builder schemBuilder = Schema.newBuilder(); if (resolvedSchema.getPrimaryKey().isPresent()) { - List<String> primaryKeyColumns = resolvedSchema.getPrimaryKey().get().getColumns(); - FlinkSchemaValidator.validatePrimaryKeyColumns(resolvedSchema, primaryKeyColumns); - schemBuilder.primaryKey(primaryKeyColumns); + schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns()); } // first build schema with physical columns @@ -225,8 +223,6 @@ public class FlinkConversions { ? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys() : ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys(); - FlinkSchemaValidator.validatePartitionKeyColumns(resolvedSchema, partitionKeys); - Map<String, String> customProperties = flinkTableConf.toMap(); CatalogPropertiesUtils.serializeComputedColumns( customProperties, resolvedSchema.getColumns()); @@ -257,8 +253,6 @@ public class FlinkConversions { Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(",")) .map(String::trim) .collect(Collectors.toList()); - - FlinkSchemaValidator.validateBucketKeyColumns(resolvedSchema, bucketKey); } else { // use primary keys - partition keys bucketKey = diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java deleted file mode 100644 index 68cbb1b65..000000000 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkSchemaValidator.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.utils; - -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.types.logical.ArrayType; -import org.apache.flink.table.types.logical.LogicalType; - -import java.util.List; - -/** Validator for Flink schema constraints. */ -public class FlinkSchemaValidator { - - private FlinkSchemaValidator() {} - - /** - * Validates that primary key columns do not contain ARRAY type. - * - * @param resolvedSchema the resolved schema - * @param primaryKeyColumns the list of primary key column names - * @throws CatalogException if a primary key column is not found in schema - * @throws UnsupportedOperationException if a primary key column is of ARRAY type - */ - public static void validatePrimaryKeyColumns( - ResolvedSchema resolvedSchema, List<String> primaryKeyColumns) { - for (String pkColumn : primaryKeyColumns) { - Column column = - resolvedSchema - .getColumn(pkColumn) - .orElseThrow( - () -> - new CatalogException( - "Primary key column " - + pkColumn - + " not found in schema.")); - LogicalType logicalType = column.getDataType().getLogicalType(); - if (logicalType instanceof ArrayType) { - throw new UnsupportedOperationException( - String.format( - "Column '%s' of ARRAY type is not supported as primary key.", - pkColumn)); - } - } - } - - /** - * Validates that partition key columns do not contain ARRAY type. - * - * @param resolvedSchema the resolved schema - * @param partitionKeyColumns the list of partition key column names - * @throws CatalogException if a partition key column is not found in schema - * @throws UnsupportedOperationException if a partition key column is of ARRAY type - */ - public static void validatePartitionKeyColumns( - ResolvedSchema resolvedSchema, List<String> partitionKeyColumns) { - for (String partitionKey : partitionKeyColumns) { - Column column = - resolvedSchema - .getColumn(partitionKey) - .orElseThrow( - () -> - new CatalogException( - "Partition key column " - + partitionKey - + " not found in schema.")); - LogicalType logicalType = column.getDataType().getLogicalType(); - if (logicalType instanceof ArrayType) { - throw new UnsupportedOperationException( - String.format( - "Column '%s' of ARRAY type is not supported as partition key.", - partitionKey)); - } - } - } - - /** - * Validates that bucket key columns do not contain ARRAY type. - * - * @param resolvedSchema the resolved schema - * @param bucketKeyColumns the list of bucket key column names - * @throws CatalogException if a bucket key column is not found in schema - * @throws UnsupportedOperationException if a bucket key column is of ARRAY type - */ - public static void validateBucketKeyColumns( - ResolvedSchema resolvedSchema, List<String> bucketKeyColumns) { - for (String bkColumn : bucketKeyColumns) { - Column column = - resolvedSchema - .getColumn(bkColumn) - .orElseThrow( - () -> - new CatalogException( - "Bucket key column " - + bkColumn - + " not found in schema.")); - LogicalType logicalType = column.getDataType().getLogicalType(); - if (logicalType instanceof ArrayType) { - throw new UnsupportedOperationException( - String.format( - "Column '%s' of ARRAY type is not supported as bucket key.", - bkColumn)); - } - } - } -} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java index 13d91bcd9..efa408a7f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java @@ -17,11 +17,18 @@ package org.apache.fluss.flink.sink; +import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -131,6 +138,47 @@ abstract class FlinkComplexTypeITCase extends AbstractTestBase { assertResultsIgnoreOrder(rowIter, expectedRows, true); } + @Test + void testArrayTypesInPartitionedLogTable() throws Exception { + tEnv.executeSql( + "create table array_log_test (" + + "id int, " + + "dt string, " + + "int_array array<int>, " + + "bigint_array array<bigint>, " + + "float_array array<float>, " + + "double_array array<double>, " + + "string_array array<string>, " + + "boolean_array array<boolean>, " + + "nested_int_array array<array<int>>, " + + "nested_string_array array<array<string>>, " + + "deeply_nested_array array<array<array<int>>>" + + ") PARTITIONED BY (dt) " + + "with ('bucket.num' = '3')"); + + tEnv.executeSql( + "INSERT INTO array_log_test VALUES " + + "(1, '2014', ARRAY[1, 2, CAST(NULL AS INT)], ARRAY[100, CAST(NULL AS BIGINT), 300], " + + "ARRAY[CAST(1.1 AS FLOAT), CAST(NULL AS FLOAT)], ARRAY[2.2, 3.3, CAST(NULL AS DOUBLE)], " + + "ARRAY['a', CAST(NULL AS STRING), 'c'], ARRAY[true, CAST(NULL AS BOOLEAN), false], " + + "ARRAY[ARRAY[1, 2], CAST(NULL AS ARRAY<INT>), ARRAY[3]], " + + "ARRAY[ARRAY['x'], ARRAY[CAST(NULL AS STRING), 'y']], " + + "ARRAY[ARRAY[ARRAY[1, 2]], ARRAY[ARRAY[3, 4, 5]]]), " + + "(2, '2013', CAST(NULL AS ARRAY<INT>), ARRAY[400, 500], " + + "ARRAY[CAST(4.4 AS FLOAT)], ARRAY[5.5], " + + "ARRAY['d', 'e'], ARRAY[true], " + + "ARRAY[ARRAY[6, 7, 8]], ARRAY[ARRAY['z']], " + + "ARRAY[ARRAY[ARRAY[9]]])") + .await(); + + CloseableIterator<Row> rowIter = tEnv.executeSql("select * from array_log_test").collect(); + List<String> expectedRows = + Arrays.asList( + "+I[1, 2014, [1, 2, null], [100, null, 300], [1.1, null], [2.2, 3.3, null], [a, null, c], [true, null, false], [[1, 2], null, [3]], [[x], [null, y]], [[[1, 2]], [[3, 4, 5]]]]", + "+I[2, 2013, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]], [[[9]]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + @Test void testArrayTypesInPrimaryKeyTable() throws Exception { tEnv.executeSql( @@ -180,11 +228,58 @@ abstract class FlinkComplexTypeITCase extends AbstractTestBase { "-U[1, [1, 2], [100, 300], [1.1], [2.2, 3.3], [a, null, c], [true, false], [[1, 2], null, [3]], [[x], [null, y]]]", "+U[1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]", "+I[4, [20, 30], [2000, 3000], [20.2], [30.3], [new], [true], [[200], [300]], [[new1], [new2]]]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // insert into with partial update test + tEnv.executeSql( + "INSERT INTO array_pk_test (id, string_array, bigint_array) VALUES " + + "(2, ARRAY['partially', 'updated'], ARRAY[9999])") + .await(); + + expectedRows = + Arrays.asList( + "-U[2, null, [400, 500], [4.4], [5.5], [d, e], [true], [[6, 7, 8]], [[z]]]", + "+U[2, null, [9999], [4.4], [5.5], [partially, updated], [true], [[6, 7, 8]], [[z]]]"); assertResultsIgnoreOrder(rowIter, expectedRows, true); + + // test lookup join with array type, test partitioned table + Schema srcSchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("c", DataTypes.INT()) + .columnByExpression("proc", "PROCTIME()") + .build(); + RowTypeInfo srcTestTypeInfo = + new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING, Types.INT}, + new String[] {"a", "name", "c"}); + List<Row> testData = + Arrays.asList( + Row.of(1, "name1", 11), + Row.of(2, "name2", 2), + Row.of(3, "name33", 33), + Row.of(10, "name0", 44)); + DataStream<Row> srcDs = env.fromCollection(testData).returns(srcTestTypeInfo); + tEnv.dropTemporaryView("src"); + tEnv.createTemporaryView("src", tEnv.fromDataStream(srcDs, srcSchema)); + CloseableIterator<Row> collected = + tEnv.executeSql( + "SELECT a, name, array_pk_test.* FROM src " + + "LEFT JOIN array_pk_test FOR SYSTEM_TIME AS OF src.proc " + + "ON src.a = array_pk_test.id") + .collect(); + List<String> expected = + Arrays.asList( + "+I[1, name1, 1, [100, 200], [1000], [10.1], [11.1], [updated], [false], [[100]], [[updated]]]", + "+I[2, name2, 2, null, [9999], [4.4], [5.5], [partially, updated], [true], [[6, 7, 8]], [[z]]]", + "+I[3, name33, 3, [10], [600], [7.7], [8.8], [f], [false], [[9]], [[w]]]", + "+I[10, name0, null, null, null, null, null, null, null, null, null]"); + assertResultsIgnoreOrder(collected, expected, true); } @Test - void testArrayTypeAsPartitionKeyThrowsException() { + void testExceptionsForArrayTypeUsage() { assertThatThrownBy( () -> tEnv.executeSql( @@ -192,40 +287,24 @@ abstract class FlinkComplexTypeITCase extends AbstractTestBase { + "id int, " + "data string, " + "tags array<string>, " - + "primary key(id) not enforced" + + "primary key(id, tags) not enforced" + ") partitioned by (tags)")) - .cause() - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("is not supported"); - } + .hasRootCauseInstanceOf(InvalidTableException.class) + .hasRootCauseMessage( + "Primary key column 'tags' has unsupported data type ARRAY<STRING> NOT NULL. " + + "Currently, primary key column does not support types: [ARRAY, MAP, ROW]."); - @Test - void testArrayTypeAsPrimaryKeyThrowsException() { - assertThatThrownBy( - () -> - tEnv.executeSql( - "create table array_pk_invalid (" - + "id int, " - + "data array<string>, " - + "primary key(data) not enforced" - + ")")) - .cause() - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("is not supported"); - } - - @Test - void testArrayTypeAsBucketKeyThrowsException() { assertThatThrownBy( () -> tEnv.executeSql( "create table array_bucket_test (" + "id int, " - + "data array<string>, " - + "primary key(id) not enforced" - + ") with ('bucket.key' = 'data', 'bucket.num' = '3')")) - .cause() - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("is not supported"); + + "data string, " + + "tags array<string> " + + ") with ('bucket.key' = 'tags')")) + .hasRootCauseInstanceOf(InvalidTableException.class) + .hasRootCauseMessage( + "Bucket key column 'tags' has unsupported data type ARRAY<STRING>. " + + "Currently, bucket key column does not support types: [ARRAY, MAP, ROW]."); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java deleted file mode 100644 index 50e9f91b6..000000000 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkSchemaValidatorTest.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.flink.utils; - -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.UniqueConstraint; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.table.api.DataTypes.ARRAY; -import static org.apache.flink.table.api.DataTypes.INT; -import static org.apache.flink.table.api.DataTypes.STRING; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** Test for {@link FlinkSchemaValidator}. */ -public class FlinkSchemaValidatorTest { - - @Test - void testValidatePrimaryKeyColumnsWithValidTypes() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("name", STRING())), - Collections.emptyList(), - UniqueConstraint.primaryKey("PK_id", Collections.singletonList("id"))); - - List<String> primaryKeyColumns = Collections.singletonList("id"); - FlinkSchemaValidator.validatePrimaryKeyColumns(schema, primaryKeyColumns); - } - - @Test - void testValidatePrimaryKeyColumnsWithArrayType() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", ARRAY(INT()).notNull()), - Column.physical("name", STRING())), - Collections.emptyList(), - UniqueConstraint.primaryKey("PK_id", Collections.singletonList("id"))); - - List<String> primaryKeyColumns = Collections.singletonList("id"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validatePrimaryKeyColumns( - schema, primaryKeyColumns)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Column 'id' of ARRAY type is not supported as primary key."); - } - - @Test - void testValidatePrimaryKeyColumnsWithMissingColumn() { - ResolvedSchema schema = - new ResolvedSchema( - Collections.singletonList(Column.physical("name", STRING())), - Collections.emptyList(), - null); - - List<String> primaryKeyColumns = Collections.singletonList("id"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validatePrimaryKeyColumns( - schema, primaryKeyColumns)) - .isInstanceOf(CatalogException.class) - .hasMessageContaining("Primary key column id not found in schema."); - } - - @Test - void testValidatePartitionKeyColumnsWithValidTypes() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("name", STRING()), - Column.physical("date", STRING())), - Collections.emptyList(), - null); - - List<String> partitionKeyColumns = Arrays.asList("name", "date"); - FlinkSchemaValidator.validatePartitionKeyColumns(schema, partitionKeyColumns); - } - - @Test - void testValidatePartitionKeyColumnsWithArrayType() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("tags", ARRAY(STRING())), - Column.physical("name", STRING())), - Collections.emptyList(), - null); - - List<String> partitionKeyColumns = Collections.singletonList("tags"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validatePartitionKeyColumns( - schema, partitionKeyColumns)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining( - "Column 'tags' of ARRAY type is not supported as partition key."); - } - - @Test - void testValidatePartitionKeyColumnsWithMissingColumn() { - ResolvedSchema schema = - new ResolvedSchema( - Collections.singletonList(Column.physical("name", STRING())), - Collections.emptyList(), - null); - - List<String> partitionKeyColumns = Collections.singletonList("date"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validatePartitionKeyColumns( - schema, partitionKeyColumns)) - .isInstanceOf(CatalogException.class) - .hasMessageContaining("Partition key column date not found in schema."); - } - - @Test - void testValidateBucketKeyColumnsWithValidTypes() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("name", STRING()), - Column.physical("category", STRING())), - Collections.emptyList(), - null); - - List<String> bucketKeyColumns = Arrays.asList("id", "name"); - FlinkSchemaValidator.validateBucketKeyColumns(schema, bucketKeyColumns); - } - - @Test - void testValidateBucketKeyColumnsWithArrayType() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("tags", ARRAY(STRING())), - Column.physical("name", STRING())), - Collections.emptyList(), - null); - - List<String> bucketKeyColumns = Collections.singletonList("tags"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validateBucketKeyColumns( - schema, bucketKeyColumns)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining( - "Column 'tags' of ARRAY type is not supported as bucket key."); - } - - @Test - void testValidateBucketKeyColumnsWithMissingColumn() { - ResolvedSchema schema = - new ResolvedSchema( - Collections.singletonList(Column.physical("name", STRING())), - Collections.emptyList(), - null); - - List<String> bucketKeyColumns = Collections.singletonList("bucket_col"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validateBucketKeyColumns( - schema, bucketKeyColumns)) - .isInstanceOf(CatalogException.class) - .hasMessageContaining("Bucket key column bucket_col not found in schema."); - } - - @Test - void testValidatePrimaryKeyColumnsWithMultipleColumns() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("name", STRING().notNull()), - Column.physical("tags", ARRAY(STRING()))), - Collections.emptyList(), - UniqueConstraint.primaryKey("PK_id_name", Arrays.asList("id", "name"))); - - List<String> primaryKeyColumns = Arrays.asList("id", "name"); - FlinkSchemaValidator.validatePrimaryKeyColumns(schema, primaryKeyColumns); - } - - @Test - void testValidatePrimaryKeyColumnsWithMultipleColumnsOneArray() { - ResolvedSchema schema = - new ResolvedSchema( - Arrays.asList( - Column.physical("id", INT().notNull()), - Column.physical("name", STRING().notNull()), - Column.physical("tags", ARRAY(STRING()).notNull())), - Collections.emptyList(), - UniqueConstraint.primaryKey("PK_id_tags", Arrays.asList("id", "tags"))); - - List<String> primaryKeyColumns = Arrays.asList("id", "tags"); - assertThatThrownBy( - () -> - FlinkSchemaValidator.validatePrimaryKeyColumns( - schema, primaryKeyColumns)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining( - "Column 'tags' of ARRAY type is not supported as primary key."); - } -} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 446e8b505..250fc30da 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -30,6 +30,7 @@ import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.MergeEngineType; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.types.DataType; @@ -66,6 +67,9 @@ public class TableDescriptorValidation { TIMESTAMP_COLUMN_NAME, BUCKET_COLUMN_NAME))); + private static final List<DataTypeRoot> KEY_UNSUPPORTED_TYPES = + Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, DataTypeRoot.ROW); + /** Validate table descriptor to create is valid and contain all necessary information. */ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) { boolean hasPrimaryKey = tableDescriptor.getSchema().getPrimaryKey().isPresent(); @@ -95,7 +99,7 @@ public class TableDescriptorValidation { // check distribution checkDistribution(tableDescriptor, maxBucketNum); - + checkPrimaryKey(tableDescriptor); // check individual options checkReplicationFactor(tableConf); checkLogFormat(tableConf, hasPrimaryKey); @@ -162,6 +166,41 @@ public class TableDescriptorValidation { "Bucket count %s exceeds the maximum limit %s.", bucketCount, maxBucketNum)); } + List<String> bucketKeys = tableDescriptor.getTableDistribution().get().getBucketKeys(); + if (!bucketKeys.isEmpty()) { + // check bucket key type + RowType schema = tableDescriptor.getSchema().getRowType(); + for (String bkColumn : bucketKeys) { + int bkIndex = schema.getFieldIndex(bkColumn); + DataType bkDataType = schema.getTypeAt(bkIndex); + if (KEY_UNSUPPORTED_TYPES.contains(bkDataType.getTypeRoot())) { + throw new InvalidTableException( + String.format( + "Bucket key column '%s' has unsupported data type %s. " + + "Currently, bucket key column does not support types: %s.", + bkColumn, bkDataType, KEY_UNSUPPORTED_TYPES)); + } + } + } + } + + private static void checkPrimaryKey(TableDescriptor tableDescriptor) { + if (tableDescriptor.hasPrimaryKey()) { + // check primary key type + RowType schema = tableDescriptor.getSchema().getRowType(); + Schema.PrimaryKey primaryKey = tableDescriptor.getSchema().getPrimaryKey().get(); + for (String pkColumn : primaryKey.getColumnNames()) { + int pkIndex = schema.getFieldIndex(pkColumn); + DataType pkDataType = schema.getTypeAt(pkIndex); + if (KEY_UNSUPPORTED_TYPES.contains(pkDataType.getTypeRoot())) { + throw new InvalidTableException( + String.format( + "Primary key column '%s' has unsupported data type %s. " + + "Currently, primary key column does not support types: %s.", + pkColumn, pkDataType, KEY_UNSUPPORTED_TYPES)); + } + } + } } private static void checkReplicationFactor(Configuration tableConf) {
