This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3ada03d744fd feat(flink): Support writing out-of-line BLOB columns
(#18958)
3ada03d744fd is described below
commit 3ada03d744fd3f0f8d2c5b88c2589ebcdac0b3ce
Author: Krishen <[email protected]>
AuthorDate: Tue Jun 16 19:29:07 2026 -0700
feat(flink): Support writing out-of-line BLOB columns (#18958)
* feat(flink): Support writing out-of-line BLOB columns
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
Co-authored-by: Cursor <[email protected]>
---
.../apache/hudi/util/HoodieSchemaConverter.java | 44 ++--
.../apache/hudi/util/RowDataToAvroConverters.java | 8 +
.../hudi/util/TestHoodieSchemaConverter.java | 17 ++
.../org/apache/hudi/table/ITTestBlobWrite.java | 226 +++++++++++++++++++++
.../hudi/utils/TestRowDataToAvroConverters.java | 120 +++++++++++
5 files changed, 394 insertions(+), 21 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index c413101dde46..c859675488ad 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -272,6 +272,16 @@ public class HoodieSchemaConverter {
/**
* Detects if a Flink RowType represents a BLOB structure by validating it
matches the schema defined in {@link HoodieSchema.Blob}.
+ *
+ * <p>Detection intentionally keys off the stable structural signals (field
names and base type
+ * roots) and does <b>not</b> assert nested-field nullability. Flink SQL
{@code CREATE TABLE} does
+ * not reliably preserve {@code NOT NULL} constraints on nested {@code ROW}
fields, so requiring an
+ * exact nullability match would silently demote a user's BLOB column to a
generic record when the
+ * column is declared through DDL. The canonical nullability is restored by
{@link HoodieSchema#createBlob()}.
+ *
+ * <p>TODO: This heuristic is a workaround for the lack of a native
Flink/Parquet BLOB logical
+ * type. See <a
href="https://github.com/apache/hudi/issues/18711">apache/hudi#18711</a> for
+ * the tracked work to remove this structural inference.
*/
private static boolean isBlobStructure(RowType rowType) {
// Validate: 3 fields with exact names
@@ -286,24 +296,20 @@ public class HoodieSchemaConverter {
return false;
}
- // Validate 'type' field: non-null STRING
- LogicalType typeField = rowType.getTypeAt(0);
- if (!isFamily(typeField, LogicalTypeFamily.CHARACTER_STRING) ||
typeField.isNullable()) {
+ // Validate 'type' field: STRING
+ if (!isFamily(rowType.getTypeAt(0), LogicalTypeFamily.CHARACTER_STRING)) {
return false;
}
- // Validate 'data' field: nullable BYTES/VARBINARY
+ // Validate 'data' field: BYTES/VARBINARY
LogicalType dataField = rowType.getTypeAt(1);
if (dataField.getTypeRoot() != LogicalTypeRoot.BINARY &&
dataField.getTypeRoot() != LogicalTypeRoot.VARBINARY) {
return false;
}
- if (!dataField.isNullable()) {
- return false;
- }
- // Validate 'reference' field: nullable ROW
+ // Validate 'reference' field: ROW
LogicalType referenceField = rowType.getTypeAt(2);
- if (!referenceField.isNullable() || referenceField.getTypeRoot() !=
LogicalTypeRoot.ROW) {
+ if (referenceField.getTypeRoot() != LogicalTypeRoot.ROW) {
return false;
}
@@ -322,24 +328,20 @@ public class HoodieSchemaConverter {
}
// Validate reference sub-field types
- // external_path: non-null STRING
- if (!isFamily(referenceRow.getTypeAt(0),
LogicalTypeFamily.CHARACTER_STRING)
- || referenceRow.getTypeAt(0).isNullable()) {
+ // external_path: STRING
+ if (!isFamily(referenceRow.getTypeAt(0),
LogicalTypeFamily.CHARACTER_STRING)) {
return false;
}
- // offset: nullable BIGINT
- if (referenceRow.getTypeAt(1).getTypeRoot() != LogicalTypeRoot.BIGINT
- || !referenceRow.getTypeAt(1).isNullable()) {
+ // offset: BIGINT
+ if (referenceRow.getTypeAt(1).getTypeRoot() != LogicalTypeRoot.BIGINT) {
return false;
}
- // length: nullable BIGINT
- if (referenceRow.getTypeAt(2).getTypeRoot() != LogicalTypeRoot.BIGINT
- || !referenceRow.getTypeAt(2).isNullable()) {
+ // length: BIGINT
+ if (referenceRow.getTypeAt(2).getTypeRoot() != LogicalTypeRoot.BIGINT) {
return false;
}
- // managed: non-null BOOLEAN
- if (referenceRow.getTypeAt(3).getTypeRoot() != LogicalTypeRoot.BOOLEAN
- || referenceRow.getTypeAt(3).isNullable()) {
+ // managed: BOOLEAN
+ if (referenceRow.getTypeAt(3).getTypeRoot() != LogicalTypeRoot.BOOLEAN) {
return false;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index 6bf94b527074..050ad483de10 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -148,6 +148,14 @@ public class RowDataToAvroConverters {
@Override
public Object convert(HoodieSchema schema, Object object) {
+ // The BLOB `type` discriminator is a STRING in Flink but an
ENUM in Avro.
+ // Detect that at call time from the HoodieSchema so the
converter stays
+ // reusable across any row shape — not hard-wired by Flink row
structure alone.
+ HoodieSchema nonNullSchema = schema.getNonNullType();
+ if (nonNullSchema.getType() == HoodieSchemaType.ENUM) {
+ return new GenericData.EnumSymbol(
+ nonNullSchema.toAvroSchema(), object.toString());
+ }
return new Utf8(((BinaryStringData) object).toBytes());
}
};
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index 76b2216b4a6a..25f05eda627c 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -771,6 +771,23 @@ public class TestHoodieSchemaConverter {
HoodieSchema convertedSchema =
HoodieSchemaConverter.convertToSchema(blobLikeRowType);
assertEquals(HoodieSchemaType.BLOB, convertedSchema.getType());
+ // Positive case: same structure but every nested field nullable. Flink
SQL CREATE TABLE does not
+ // preserve NOT NULL on nested ROW fields, so detection must not depend on
nested nullability.
+ DataType allNullableBlobRow = DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD,
DataTypes.BYTES().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.STRING().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BOOLEAN().nullable())
+ ).nullable())
+ ).notNull();
+
+ RowType allNullableBlobRowType = (RowType)
allNullableBlobRow.getLogicalType();
+ HoodieSchema allNullableConverted =
HoodieSchemaConverter.convertToSchema(allNullableBlobRowType);
+ assertEquals(HoodieSchemaType.BLOB, allNullableConverted.getType());
+
// Negative case 1: Different field names
DataType differentNames = DataTypes.ROW(
DataTypes.FIELD("wrong_name", DataTypes.STRING().notNull()),
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestBlobWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestBlobWrite.java
new file mode 100644
index 000000000000..a66c9f034765
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestBlobWrite.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * IT case for writing out-of-line (OOL) BLOB columns through the Flink writer
and reading them back.
+ *
+ * <p>Verifies that the Hudi {@link HoodieSchema.Blob} structure round-trips
through the Flink
+ * write/read pipeline (both COW and MOR), that updates land through the MOR
Avro log path, and that
+ * the stored table schema keeps the BLOB logical type instead of degrading
the column to a generic
+ * Flink/Avro record.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestBlobWrite {
+
+ @TempDir
+ File tempFile;
+
+ private static final String BLOB_COLUMN = "blob_col";
+
+ private static final String BLOB_COLUMN_DDL =
+ " " + BLOB_COLUMN + " ROW<\n"
+ + " `type` STRING NOT NULL,\n"
+ + " `data` BYTES,\n"
+ + " `reference` ROW<\n"
+ + " external_path STRING NOT NULL,\n"
+ + " `offset` BIGINT,\n"
+ + " `length` BIGINT,\n"
+ + " managed BOOLEAN NOT NULL\n"
+ + " >\n"
+ + " >,\n";
+
+ /**
+ * Regression for {@code HoodieSchemaConverter#isBlobStructure}: Flink SQL
{@code CREATE TABLE}
+ * may declare nested {@code ROW} fields as {@code NOT NULL}, but
+ * {@link ResolvedSchema#toPhysicalRowDataType()} does not always preserve
those constraints in
+ * the {@link RowType}. Schema inference must still recognize the BLOB shape
(same path as
+ * {@code HoodieTableFactory#inferAvroSchema}), otherwise {@code blob_col}
would degrade to a
+ * generic RECORD in the committed Hoodie schema.
+ */
+ @Test
+ public void testFlinkSqlDdlPhysicalRowTypeStillMapsToHoodieBlob() {
+ TableEnvironment tableEnv = batchEnv();
+ final String probeTable = "flink_blob_ddl_probe";
+ String createProbeDdl =
+ "CREATE TABLE "
+ + probeTable
+ + " (\n"
+ + " id BIGINT,\n"
+ + " name STRING,\n"
+ + BLOB_COLUMN_DDL
+ + " ts BIGINT\n"
+ + ") WITH ('connector'='blackhole')";
+ tableEnv.executeSql(createProbeDdl);
+
+ ResolvedSchema resolved = tableEnv.from(probeTable).getResolvedSchema();
+ RowType physical = (RowType)
resolved.toPhysicalRowDataType().getLogicalType();
+ // DDL uses NOT NULL on nested BLOB ROW fields where the canonical Hoodie
BLOB shape is
+ // stricter. Flink's physical RowType from
ResolvedSchema#toPhysicalRowDataType() may or may
+ // not preserve those flags across versions (see Flink table config /
release notes linked in
+ // the PR). We do not assert which fields widen — only that Hudi still
recognizes the column as
+ // a BLOB (regression for HoodieSchemaConverter#isBlobStructure).
+
+ HoodieSchema recordSchema =
+ HoodieSchemaConverter.convertToSchema(
+ physical, HoodieSchemaUtils.getRecordQualifiedName(probeTable));
+ HoodieSchemaField blobField =
+ recordSchema
+ .getField(BLOB_COLUMN)
+ .orElseThrow(() -> new AssertionError("blob_col missing from
converted HoodieSchema"));
+ assertTrue(
+ blobField.schema().isBlobField(),
+ "Physical RowType from Flink SQL DDL must still map to Hoodie BLOB,
got: "
+ + blobField.schema());
+
+ tableEnv.executeSql("DROP TABLE " + probeTable);
+ }
+
+ private void createTable(TableEnvironment tableEnv, String tablePath,
HoodieTableType tableType) {
+ String createTableDdl = String.format(
+ "CREATE TABLE blob_table (\n"
+ + " id BIGINT,\n"
+ + " name STRING,\n"
+ + BLOB_COLUMN_DDL
+ + " ts BIGINT,\n"
+ + " PRIMARY KEY (id) NOT ENFORCED\n"
+ + ") WITH (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'path' = '%s',\n"
+ + " 'table.type' = '%s',\n"
+ + " 'ordering.fields' = 'ts'\n"
+ + ");",
+ tablePath, tableType.name());
+ tableEnv.executeSql(createTableDdl);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testWriteAndReadOutOfLineBlob(HoodieTableType tableType) throws
Exception {
+ TableEnvironment tableEnv = batchEnv();
+ String tablePath = new File(tempFile, "blob_table").getAbsolutePath();
+ createTable(tableEnv, tablePath, tableType);
+
+ // First batch: insert two OOL blob references.
+ execInsert(tableEnv,
+ "INSERT INTO blob_table VALUES\n"
+ + "(1, 'doc-1', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+ + "ROW('file1.bin', CAST(0 AS BIGINT), CAST(100 AS BIGINT),
false)), 1000),\n"
+ + "(2, 'doc-2', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+ + "ROW('file1.bin', CAST(100 AS BIGINT), CAST(200 AS BIGINT),
false)), 2000)");
+
+ List<Row> rows = readOrdered(tableEnv);
+ assertEquals(2, rows.size());
+ assertOutOfLineRow(rows.get(0), 1L, "doc-1", "file1.bin", 0L, 100L, 1000L);
+ assertOutOfLineRow(rows.get(1), 2L, "doc-2", "file1.bin", 100L, 200L,
2000L);
+
+ // Second batch: upsert the same keys with new references. For MOR this
exercises the Avro
+ // log write path (RowData -> Avro), including the BLOB enum `type` field.
+ execInsert(tableEnv,
+ "INSERT INTO blob_table VALUES\n"
+ + "(1, 'doc-1', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+ + "ROW('file2.bin', CAST(500 AS BIGINT), CAST(300 AS BIGINT),
false)), 3000),\n"
+ + "(2, 'doc-2', ROW('OUT_OF_LINE', CAST(NULL AS BYTES), "
+ + "ROW('file2.bin', CAST(800 AS BIGINT), CAST(400 AS BIGINT),
false)), 4000)");
+
+ List<Row> updated = readOrdered(tableEnv);
+ assertEquals(2, updated.size());
+ assertOutOfLineRow(updated.get(0), 1L, "doc-1", "file2.bin", 500L, 300L,
3000L);
+ assertOutOfLineRow(updated.get(1), 2L, "doc-2", "file2.bin", 800L, 400L,
4000L);
+
+ // The stored table schema must keep the BLOB logical type, not a generic
record.
+ assertBlobTypePreserved(tablePath);
+ }
+
+ private static List<Row> readOrdered(TableEnvironment tableEnv) {
+ return CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select id, name, blob_col, ts from blob_table
order by id").collect());
+ }
+
+ private static void assertOutOfLineRow(Row row, long id, String name, String
path,
+ long offset, long length, long ts) {
+ assertEquals(id, row.getField(0));
+ assertEquals(name, row.getField(1));
+ Row blob = (Row) row.getField(2);
+ assertNotNull(blob, "blob struct must be populated");
+ assertEquals("OUT_OF_LINE", blob.getField(0));
+ assertNull(blob.getField(1), "inline data must be null for OUT_OF_LINE
blob");
+ Row reference = (Row) blob.getField(2);
+ assertNotNull(reference, "reference must be populated for OUT_OF_LINE
blob");
+ assertEquals(path, reference.getField(0));
+ assertEquals(offset, reference.getField(1));
+ assertEquals(length, reference.getField(2));
+ assertEquals(false, reference.getField(3));
+ assertEquals(ts, row.getField(3));
+ }
+
+ private static void assertBlobTypePreserved(String tablePath) throws
Exception {
+ HoodieTableMetaClient metaClient =
+ StreamerUtil.createMetaClient(tablePath, new
org.apache.hadoop.conf.Configuration());
+ HoodieSchema tableSchema = new
TableSchemaResolver(metaClient).getTableSchema();
+ HoodieSchemaField blobField = tableSchema.getField(BLOB_COLUMN)
+ .orElseThrow(() -> new AssertionError("blob_col field missing from
table schema"));
+ assertTrue(blobField.schema().isBlobField(),
+ "blob_col must keep the BLOB logical type, found: " +
blobField.schema());
+ }
+
+ private static TableEnvironment batchEnv() {
+ TableEnvironment tableEnv =
org.apache.hudi.utils.TestTableEnvs.getBatchTableEnv();
+ tableEnv.getConfig().getConfiguration()
+ .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
+ return tableEnv;
+ }
+
+ private static void execInsert(TableEnvironment tableEnv, String insertSql)
throws Exception {
+ TableResult result = tableEnv.executeSql(insertSql);
+ result.getJobClient().get().getJobExecutionResult().get(120,
TimeUnit.SECONDS);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
index 185c5830616c..d7c68b7e318d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -18,15 +18,22 @@
package org.apache.hudi.utils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonToRowDataConverters;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.jupiter.api.Assertions;
@@ -36,6 +43,7 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
@@ -81,4 +89,116 @@ class TestRowDataToAvroConverters {
Assertions.assertEquals("2021-03-30 08:44:29",
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long)
avroRecord.get(0)), ZoneId.of("UTC+1"))));
Assertions.assertEquals("2021-03-30 15:44:29",
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long)
avroRecord.get(0)), ZoneId.of("Asia/Shanghai"))));
}
+
+ @Test
+ void testRowDataToAvroBlobTypeFieldWritesEnumSymbol() {
+ // Flink models the BLOB `type` discriminator as STRING, but its Avro
encoding is an ENUM
+ // (blob_storage_type). The converter must emit a GenericData.EnumSymbol,
not a plain Utf8,
+ // otherwise Avro log-block writes (MOR) fail with "value OUT_OF_LINE (a
Utf8) is not a
+ // blob_storage_type".
+ DataType blobRow = DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD,
DataTypes.BYTES().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BOOLEAN().notNull())
+ ).nullable()));
+ RowType rowType = (RowType) DataTypes.ROW(DataTypes.FIELD("blob_col",
blobRow)).getLogicalType();
+
+ GenericRowData reference = new GenericRowData(4);
+ reference.setField(0, StringData.fromString("file1.bin"));
+ reference.setField(1, 0L);
+ reference.setField(2, 100L);
+ reference.setField(3, false);
+
+ GenericRowData blob = new GenericRowData(3);
+ blob.setField(0, StringData.fromString(HoodieSchema.Blob.OUT_OF_LINE));
+ blob.setField(1, null);
+ blob.setField(2, reference);
+
+ GenericRowData top = new GenericRowData(1);
+ top.setField(0, blob);
+
+ RowDataToAvroConverters.RowDataToAvroConverter converter =
+ RowDataToAvroConverters.createConverter(rowType);
+ GenericRecord avroRecord =
+ (GenericRecord)
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), top);
+
+ GenericRecord blobRecord = (GenericRecord) avroRecord.get(0);
+ Object typeValue = blobRecord.get(HoodieSchema.Blob.TYPE);
+ Assertions.assertInstanceOf(GenericData.EnumSymbol.class, typeValue,
+ "BLOB `type` must be written as an Avro EnumSymbol, found: "
+ + (typeValue == null ? "null" : typeValue.getClass().getName()));
+ Assertions.assertEquals(HoodieSchema.Blob.OUT_OF_LINE,
typeValue.toString());
+ }
+
+ /**
+ * A ROW whose field names match the BLOB structure but whose {@link
HoodieSchema} carries a
+ * plain {@code STRING} (not {@code ENUM}) for the {@code type} field must
write a plain
+ * {@link Utf8}, not a {@link GenericData.EnumSymbol}.
+ */
+ @Test
+ void testBlobShapedRowWithPlainStringSchemaWritesUtf8() {
+ DataType blobShapedRow = DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.TYPE, DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.INLINE_DATA_FIELD,
DataTypes.BYTES().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE, DataTypes.ROW(
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
DataTypes.STRING().notNull()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
DataTypes.BIGINT().nullable()),
+ DataTypes.FIELD(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
DataTypes.BOOLEAN().notNull())
+ ).nullable()));
+ RowType outerRowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("blob_col", blobShapedRow)).getLogicalType();
+
+ // Plain RECORD schema: field[0] is STRING (not ENUM) — mimics a non-BLOB
record whose
+ // shape happens to match the BLOB structure.
+ HoodieSchema refSchema = HoodieSchema.createRecord("reference", null,
null, Arrays.asList(
+ HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH,
+ HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_OFFSET,
+ HoodieSchema.createNullable(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_LENGTH,
+ HoodieSchema.createNullable(HoodieSchemaType.LONG)),
+ HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE_IS_MANAGED,
+ HoodieSchema.create(HoodieSchemaType.BOOLEAN))
+ ));
+ HoodieSchema plainBlobShapedSchema = HoodieSchema.createRecord("blob_col",
null, null, Arrays.asList(
+ HoodieSchemaField.of(HoodieSchema.Blob.TYPE,
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of(HoodieSchema.Blob.INLINE_DATA_FIELD,
+ HoodieSchema.createNullable(HoodieSchemaType.BYTES)),
+ HoodieSchemaField.of(HoodieSchema.Blob.EXTERNAL_REFERENCE,
+ HoodieSchema.createNullable(refSchema))
+ ));
+ HoodieSchema outerSchema = HoodieSchema.createRecord("outer", null, null,
Arrays.asList(
+ HoodieSchemaField.of("blob_col", plainBlobShapedSchema)
+ ));
+
+ GenericRowData reference = new GenericRowData(4);
+ reference.setField(0, StringData.fromString("file1.bin"));
+ reference.setField(1, 0L);
+ reference.setField(2, 100L);
+ reference.setField(3, false);
+
+ GenericRowData blobRow = new GenericRowData(3);
+ blobRow.setField(0, StringData.fromString("OUT_OF_LINE"));
+ blobRow.setField(1, null);
+ blobRow.setField(2, reference);
+
+ GenericRowData top = new GenericRowData(1);
+ top.setField(0, blobRow);
+
+ RowDataToAvroConverters.RowDataToAvroConverter converter =
+ RowDataToAvroConverters.createConverter(outerRowType);
+ GenericRecord avroRecord = (GenericRecord) converter.convert(outerSchema,
top);
+
+ GenericRecord blobRecord = (GenericRecord) avroRecord.get(0);
+ Object typeValue = blobRecord.get(HoodieSchema.Blob.TYPE);
+ Assertions.assertInstanceOf(Utf8.class, typeValue,
+ "STRING field must write as Utf8 (not EnumSymbol) when HoodieSchema is
not ENUM; found: "
+ + (typeValue == null ? "null" : typeValue.getClass().getName()));
+ Assertions.assertEquals("OUT_OF_LINE", typeValue.toString());
+ }
}
\ No newline at end of file