This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c218231bd [HUDI-5244] Fix bugs in schema evolution client with lost 
operation field and not found schema (#7248)
4c218231bd is described below

commit 4c218231bdb2a24310145b721cf87ba7b0f1534a
Author: Alexander Trushev <trushev.a...@gmail.com>
AuthorDate: Mon Nov 21 10:54:01 2022 +0700

    [HUDI-5244] Fix bugs in schema evolution client with lost operation field 
and not found schema (#7248)
    
    * [HUDI-5244] Fix bugs in schema evolution client with lost operation field 
and not found schema
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 21 +++--
 .../action/commit/TestSchemaEvolutionClient.java   | 96 ++++++++++++++++++++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  4 +
 3 files changed, 114 insertions(+), 7 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 4a3f6bd311..133dfce9e9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -280,7 +280,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
     if (!historySchemaStr.isEmpty() || 
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
 {
       InternalSchema internalSchema;
-      Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new 
Schema.Parser().parse(config.getSchema()));
+      Schema avroSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), 
config.allowOperationMetadataField());
       if (historySchemaStr.isEmpty()) {
         internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
         internalSchema.setSchemaId(Long.parseLong(instantTime));
@@ -1762,16 +1762,13 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
   private Pair<InternalSchema, HoodieTableMetaClient> 
getInternalSchemaAndMetaClient() {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-    Option<InternalSchema> internalSchemaOption = 
schemaUtil.getTableInternalSchemaFromCommitMetadata();
-    if (!internalSchemaOption.isPresent()) {
-      throw new HoodieException(String.format("cannot find schema for current 
table: %s", config.getBasePath()));
-    }
-    return Pair.of(internalSchemaOption.get(), metaClient);
+    return Pair.of(getInternalSchema(schemaUtil), metaClient);
   }
 
   private void commitTableChange(InternalSchema newSchema, 
HoodieTableMetaClient metaClient) {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-    String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+    String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(
+        () -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), ""));
     Schema schema = AvroInternalSchemaConverter.convert(newSchema, 
config.getTableName());
     String commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, 
metaClient.getTableType());
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
@@ -1793,4 +1790,14 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
     commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), 
commitActionType);
   }
+
+  private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
+    return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(() 
-> {
+      try {
+        return 
AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema());
+      } catch (Exception e) {
+        throw new HoodieException(String.format("cannot find schema for 
current table: %s", config.getBasePath()));
+      }
+    });
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java
new file mode 100644
index 0000000000..bda4a3267d
--- /dev/null
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.testutils.HoodieJavaClientTestBase;
+
+import org.apache.avro.Schema;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for schema evolution client api.
+ */
+public class TestSchemaEvolutionClient extends HoodieJavaClientTestBase {
+
+  private static final Schema SCHEMA = 
getSchemaFromResource(TestSchemaEvolutionClient.class, "/exampleSchema.avsc");
+
+  @BeforeEach
+  public void setUpClient() throws IOException {
+    HoodieJavaWriteClient<RawTripTestPayload> writeClient = getWriteClient();
+    this.writeClient = writeClient;
+    prepareTable(writeClient);
+  }
+
+  @AfterEach
+  public void closeClient() {
+    if (writeClient != null) {
+      writeClient.close();
+    }
+  }
+
+  @Test
+  public void testUpdateColumnType() {
+    writeClient.updateColumnType("number", Types.LongType.get());
+    assertEquals(Types.LongType.get(), getFieldByName("number").type());
+  }
+
+  private HoodieJavaWriteClient<RawTripTestPayload> getWriteClient() {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withEngineType(EngineType.JAVA)
+        .withPath(basePath)
+        .withSchema(SCHEMA.toString())
+        .build();
+    return new HoodieJavaWriteClient<>(context, config);
+  }
+
+  private void prepareTable(HoodieJavaWriteClient<RawTripTestPayload> 
writeClient) throws IOException {
+    String commitTime = "1";
+    writeClient.startCommitWithTime(commitTime);
+    //language=JSON
+    String jsonRow = "{\"_row_key\": \"1\", \"time\": 
\"2000-01-01T00:00:00.000Z\", \"number\": 1}";
+    RawTripTestPayload payload = new RawTripTestPayload(jsonRow);
+    HoodieAvroRecord<RawTripTestPayload> record = new HoodieAvroRecord<>(
+        new HoodieKey(payload.getRowKey(), payload.getPartitionPath()), 
payload);
+    writeClient.insert(Collections.singletonList(record), commitTime);
+  }
+
+  private Types.Field getFieldByName(String fieldName) {
+    return new TableSchemaResolver(metaClient)
+        .getTableInternalSchemaFromCommitMetadata()
+        .get()
+        .findField(fieldName);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 2f226b2d46..ef2de67347 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -184,6 +184,10 @@ public class HoodieAvroUtils {
     return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema));
   }
 
+  public static Schema createHoodieWriteSchema(String originalSchema, boolean 
withOperationField) {
+    return addMetadataFields(new Schema.Parser().parse(originalSchema), 
withOperationField);
+  }
+
   /**
    * Adds the Hoodie metadata fields to the given schema.
    *

Reply via email to