This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eea01d1d76f [HUDI-8841] Fix schema validating exception during flink
async clustering (#12598)
eea01d1d76f is described below
commit eea01d1d76f475bbda9f9b1ffb12025d1a020ee8
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jan 16 10:06:21 2025 +0800
[HUDI-8841] Fix schema validating exception during flink async clustering
(#12598)
---
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 24 +++++
.../hudi/sink/clustering/ClusteringOperator.java | 6 +-
.../sink/cluster/ITTestHoodieFlinkClustering.java | 100 ++++++++++++++++-----
3 files changed, 107 insertions(+), 23 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index 18fdedca50f..e4a5fe8ecf1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -28,6 +28,9 @@ import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +45,9 @@ import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.CollectionUtils.reduce;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;
/**
* Utils for Avro Schema.
@@ -591,4 +596,23 @@ public class AvroSchemaUtils {
public static String createSchemaErrorString(String errorMessage, Schema
writerSchema, Schema tableSchema) {
return String.format("%s\nwriterSchema: %s\ntableSchema: %s",
errorMessage, writerSchema, tableSchema);
}
+
+ /**
+ * Create a new schema by force changing all the fields as nullable.
+ *
+ * @param schema original schema
+ * @return a new schema with all the fields updated as nullable.
+ */
+ public static Schema asNullable(Schema schema) {
+ List<String> filterCols = schema.getFields().stream()
+ .filter(f ->
!f.schema().isNullable()).map(Schema.Field::name).collect(Collectors.toList());
+ if (filterCols.isEmpty()) {
+ return schema;
+ }
+ InternalSchema internalSchema = convert(schema);
+ TableChanges.ColumnUpdateChange schemaChange =
TableChanges.ColumnUpdateChange.get(internalSchema);
+ schemaChange = reduce(filterCols, schemaChange,
+ (change, field) -> change.updateColumnNullability(field, true));
+ return convert(SchemaChangeUtils.applyTableChanges2Schema(internalSchema,
schemaChange), schema.getFullName());
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 89f203c6eda..5ba2a5c3922 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.clustering;
import org.apache.hudi.adapter.MaskingOutputAdapter;
import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -167,7 +168,10 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
this.table = writeClient.getHoodieTable();
this.schema = AvroSchemaConverter.convertToSchema(rowType);
- this.readerSchema = this.schema;
+ // Since there exists discrepancies between flink and spark dealing with
nullability of primary key field,
+ // and there may be some files written by spark, force update schema as
nullable to make sure clustering
+ // scan successfully without schema validating exception.
+ this.readerSchema = AvroSchemaUtils.asNullable(schema);
this.requiredPos = getRequiredPositions();
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(rowType);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index f1e72a9ad01..5d0c8dc61e2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -73,6 +73,8 @@ import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.HashMap;
@@ -533,6 +535,82 @@ public class ITTestHoodieFlinkClustering {
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);
+ runCluster(rowType);
+
+ // test output
+ final Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1,
id2,par1,id2,Stephen,33,2100001,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2,
id4,par2,id4,Fabian,31,4100001,par2]");
+ expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3,
id6,par3,id6,Emma,20,6100001,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4,
id8,par4,id8,Han,56,8100001,par4]");
+ TestData.checkWrittenData(tempFile, expected, 4);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testInsertWithDifferentRecordKeyNullabilityAndClustering(boolean
withPk) throws Exception {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+
+ // if create a table without primary key, the nullability of the record
key field is nullable
+ // otherwise, the nullability is not nullable.
+ String pkConstraint = withPk ? ", primary key (uuid) not enforced\n" : "";
+ String tblWithoutPkDDL = "create table t1(\n"
+ + " `uuid` VARCHAR(20)\n"
+ + ", `name` VARCHAR(10)\n"
+ + ", `age` INT\n"
+ + ", `ts` TIMESTAMP(3)\n"
+ + ", `partition` VARCHAR(10)\n"
+ + pkConstraint
+ + ")\n"
+ + "PARTITIONED BY (`partition`)\n"
+ + "with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'hoodie.datasource.write.recordkey.field' = 'uuid',\n"
+ + " 'path' = '" + tempFile.getAbsolutePath() + "'\n"
+ + ")";
+ tableEnv.executeSql(tblWithoutPkDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+ final RowType rowType = (RowType) DataTypes.ROW(
+ DataTypes.FIELD("uuid", DataTypes.VARCHAR(20).notNull()), //
primary key set as not null
+ DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+ DataTypes.FIELD("age", DataTypes.INT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)),
+ DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+ .notNull().getLogicalType();
+
+ // run cluster with row type
+ runCluster(rowType);
+
+ final Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
+ expected.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
+ expected.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ TestData.checkWrittenData(tempFile, expected, 4);
+ }
+
+ @Test
+ public void testOfflineClusterFailoverAfterCommit() throws Exception {
+ StreamTableEnvironment tableEnv = prepareEnvAndTable();
+
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ cfg.targetPartitions = 4;
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+ assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
+
+ Table result = tableEnv.sqlQuery("select count(*) from t1");
+ assertEquals(16L, tableEnv.toDataStream(result,
Row.class).executeAndCollect(1).get(0).getField(0));
+ }
+
+ /**
+ * schedule clustering, run clustering.
+ */
+ private void runCluster(RowType rowType) throws Exception {
// make configuration and setAvroSchema.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
@@ -604,31 +682,9 @@ public class ITTestHoodieFlinkClustering {
.setParallelism(1);
env.execute("flink_hudi_clustering");
-
- // test output
- final Map<String, String> expected = new HashMap<>();
- expected.put("par1", "[id1,par1,id1,Danny,23,1100001,par1,
id2,par1,id2,Stephen,33,2100001,par1]");
- expected.put("par2", "[id3,par2,id3,Julian,53,3100001,par2,
id4,par2,id4,Fabian,31,4100001,par2]");
- expected.put("par3", "[id5,par3,id5,Sophia,18,5100001,par3,
id6,par3,id6,Emma,20,6100001,par3]");
- expected.put("par4", "[id7,par4,id7,Bob,44,7100001,par4,
id8,par4,id8,Han,56,8100001,par4]");
- TestData.checkWrittenData(tempFile, expected, 4);
}
}
- @Test
- public void testOfflineClusterFailoverAfterCommit() throws Exception {
- StreamTableEnvironment tableEnv = prepareEnvAndTable();
-
- FlinkClusteringConfig cfg = new FlinkClusteringConfig();
- cfg.path = tempFile.getAbsolutePath();
- cfg.targetPartitions = 4;
- Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
- assertDoesNotThrow(() -> runOfflineCluster(tableEnv, conf));
-
- Table result = tableEnv.sqlQuery("select count(*) from t1");
- assertEquals(16L, tableEnv.toDataStream(result,
Row.class).executeAndCollect(1).get(0).getField(0));
- }
-
private StreamTableEnvironment prepareEnvAndTable() {
// Create hoodie table and insert into data.
Configuration conf = new org.apache.flink.configuration.Configuration();