This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 958e85bac23e5778ce038d54ac1012a4a1c8aa35 Author: jsingh-yelp <[email protected]> AuthorDate: Thu Jun 12 21:55:35 2025 -0400 [flink] Add support for AvroType enum for debezium format (#5742) --- .../cdc/format/debezium/DebeziumSchemaUtils.java | 1 + .../format/debezium/DebeziumSchemaUtilsTest.java | 153 +++++++++++++++++++++ 2 files changed, 154 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 114344bade..80f99165e6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -475,6 +475,7 @@ public class DebeziumSchemaUtils { return DataTypes.INT(); case LONG: return DataTypes.BIGINT(); + case ENUM: case STRING: return DataTypes.STRING(); case RECORD: diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtilsTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtilsTest.java new file mode 100644 index 0000000000..7381bb4890 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtilsTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.debezium; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +/** Test class for DebeziumSchemaUtils. */ +public class DebeziumSchemaUtilsTest { + @Test + public void testFromDebeziumAvroTypeWithGenericUnion() { + // Create a union schema with multiple non-null types + Schema stringSchema = Schema.create(Schema.Type.STRING); + Schema intSchema = Schema.create(Schema.Type.INT); + Schema unionSchema = Schema.createUnion(Arrays.asList(stringSchema, intSchema)); + + // Test that an exception is thrown for generic unions + UnsupportedOperationException exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> DebeziumSchemaUtils.avroToPaimonDataType(unionSchema)); + + Assertions.assertEquals("Generic unions are not supported", exception.getMessage()); + } + + @Test + public void testFromDebeziumAvroTypeWithLogicalTypes() { + // Test date logical type + Schema dateSchema = Schema.create(Schema.Type.INT); + LogicalTypes.date().addToSchema(dateSchema); + DataType dateType = DebeziumSchemaUtils.avroToPaimonDataType(dateSchema); + Assertions.assertEquals(DataTypes.DATE(), dateType); + + // Test decimal logical type + Schema decimalSchema = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(10, 2).addToSchema(decimalSchema); + DataType decimalType = DebeziumSchemaUtils.avroToPaimonDataType(decimalSchema); + Assertions.assertEquals(DataTypes.DECIMAL(10, 2), decimalType); + + // Test timestamp-millis logical type + Schema timestampMillisSchema = Schema.create(Schema.Type.LONG); + LogicalTypes.timestampMillis().addToSchema(timestampMillisSchema); + DataType timestampMillisType = + DebeziumSchemaUtils.avroToPaimonDataType(timestampMillisSchema); + Assertions.assertEquals(DataTypes.TIMESTAMP_MILLIS(), timestampMillisType); + + // Test timestamp-micros logical type + Schema timestampMicrosSchema = Schema.create(Schema.Type.LONG); + LogicalTypes.timestampMicros().addToSchema(timestampMicrosSchema); + DataType timestampMicrosType = + DebeziumSchemaUtils.avroToPaimonDataType(timestampMicrosSchema); + Assertions.assertEquals(DataTypes.TIMESTAMP(), timestampMicrosType); + } + + @Test + public void testFromDebeziumAvroTypeWithPrimitiveTypes() { + // Test boolean type + Schema booleanSchema = Schema.create(Schema.Type.BOOLEAN); + DataType booleanType = DebeziumSchemaUtils.avroToPaimonDataType(booleanSchema); + Assertions.assertEquals(DataTypes.BOOLEAN(), booleanType); + + // Test int type + Schema intSchema = Schema.create(Schema.Type.INT); + DataType intType = DebeziumSchemaUtils.avroToPaimonDataType(intSchema); + Assertions.assertEquals(DataTypes.INT(), intType); + + // Test long type + Schema longSchema = Schema.create(Schema.Type.LONG); + DataType longType = DebeziumSchemaUtils.avroToPaimonDataType(longSchema); + Assertions.assertEquals(DataTypes.BIGINT(), longType); + + // Test float type + Schema floatSchema = Schema.create(Schema.Type.FLOAT); + DataType floatType = DebeziumSchemaUtils.avroToPaimonDataType(floatSchema); + Assertions.assertEquals(DataTypes.FLOAT(), floatType); + + // Test double type + Schema doubleSchema = Schema.create(Schema.Type.DOUBLE); + DataType doubleType = DebeziumSchemaUtils.avroToPaimonDataType(doubleSchema); + Assertions.assertEquals(DataTypes.DOUBLE(), doubleType); + + // Test enum type + Schema enumSchema = + Schema.createEnum("TestEnum", null, null, Arrays.asList("VALUE1", "VALUE2")); + DataType enumType = DebeziumSchemaUtils.avroToPaimonDataType(enumSchema); + Assertions.assertEquals(DataTypes.STRING(), enumType); + + // Test string type + Schema stringSchema = Schema.create(Schema.Type.STRING); + DataType stringType = DebeziumSchemaUtils.avroToPaimonDataType(stringSchema); + Assertions.assertEquals(DataTypes.STRING(), stringType); + + // Test bytes type + Schema bytesSchema = Schema.create(Schema.Type.BYTES); + DataType bytesType = DebeziumSchemaUtils.avroToPaimonDataType(bytesSchema); + Assertions.assertEquals(DataTypes.BYTES(), bytesType); + } + + @Test + public void testFromDebeziumAvroTypeWithComplexTypes() { + // Test array type + Schema stringSchema = Schema.create(Schema.Type.STRING); + Schema arraySchema = Schema.createArray(stringSchema); + DataType arrayType = DebeziumSchemaUtils.avroToPaimonDataType(arraySchema); + Assertions.assertEquals(DataTypes.ARRAY(DataTypes.STRING()), arrayType); + + // Test map type + Schema mapSchema = Schema.createMap(stringSchema); + DataType mapType = DebeziumSchemaUtils.avroToPaimonDataType(mapSchema); + Assertions.assertEquals(DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), mapType); + + // Test record type + Schema recordSchema = Schema.createRecord("TestRecord", null, null, false); + List<Schema.Field> fields = + Arrays.asList( + new Schema.Field("field1", stringSchema, null, null), + new Schema.Field("field2", Schema.create(Schema.Type.INT), null, null)); + recordSchema.setFields(fields); + DataType recordType = DebeziumSchemaUtils.avroToPaimonDataType(recordSchema); + + DataField[] expectedFields = + new DataField[] { + DataTypes.FIELD(0, "field1", DataTypes.STRING(), null), + DataTypes.FIELD(1, "field2", DataTypes.INT(), null) + }; + Assertions.assertEquals(DataTypes.ROW(expectedFields), recordType); + } +}
