This is an automated email from the ASF dual-hosted git repository. mengtao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4c218231bd [HUDI-5244] Fix bugs in schema evolution client with lost operation field and not found schema (#7248) 4c218231bd is described below commit 4c218231bdb2a24310145b721cf87ba7b0f1534a Author: Alexander Trushev <trushev.a...@gmail.com> AuthorDate: Mon Nov 21 10:54:01 2022 +0700 [HUDI-5244] Fix bugs in schema evolution client with lost operation field and not found schema (#7248) * [HUDI-5244] Fix bugs in schema evolution client with lost operation field and not found schema --- .../apache/hudi/client/BaseHoodieWriteClient.java | 21 +++-- .../action/commit/TestSchemaEvolutionClient.java | 96 ++++++++++++++++++++++ .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 + 3 files changed, 114 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 4a3f6bd311..133dfce9e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -280,7 +280,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { InternalSchema internalSchema; - Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema())); + Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { internalSchema = AvroInternalSchemaConverter.convert(avroSchema); internalSchema.setSchemaId(Long.parseLong(instantTime)); @@ -1762,16 +1762,13 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, private Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() { HoodieTableMetaClient metaClient = createMetaClient(true); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - Option<InternalSchema> internalSchemaOption = schemaUtil.getTableInternalSchemaFromCommitMetadata(); - if (!internalSchemaOption.isPresent()) { - throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath())); - } - return Pair.of(internalSchemaOption.get(), metaClient); + return Pair.of(getInternalSchema(schemaUtil), metaClient); } private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) { TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse(""); + String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet( + () -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), "")); Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName()); String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType()); String instantTime = HoodieActiveTimeline.createNewInstantTime(); @@ -1793,4 +1790,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr)); commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType); } + + private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) { + return schemaUtil.getTableInternalSchemaFromCommitMetadata().orElseGet(() -> { + try { + return AvroInternalSchemaConverter.convert(schemaUtil.getTableAvroSchema()); + } catch (Exception e) { + throw new HoodieException(String.format("cannot find schema for current table: %s", config.getBasePath())); + } + }); + } } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java new file mode 100644 index 0000000000..bda4a3267d --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestSchemaEvolutionClient.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.Types; +import org.apache.hudi.testutils.HoodieJavaClientTestBase; + +import org.apache.avro.Schema; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for schema evolution client api. + */ +public class TestSchemaEvolutionClient extends HoodieJavaClientTestBase { + + private static final Schema SCHEMA = getSchemaFromResource(TestSchemaEvolutionClient.class, "/exampleSchema.avsc"); + + @BeforeEach + public void setUpClient() throws IOException { + HoodieJavaWriteClient<RawTripTestPayload> writeClient = getWriteClient(); + this.writeClient = writeClient; + prepareTable(writeClient); + } + + @AfterEach + public void closeClient() { + if (writeClient != null) { + writeClient.close(); + } + } + + @Test + public void testUpdateColumnType() { + writeClient.updateColumnType("number", Types.LongType.get()); + assertEquals(Types.LongType.get(), getFieldByName("number").type()); + } + + private HoodieJavaWriteClient<RawTripTestPayload> getWriteClient() { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.JAVA) + .withPath(basePath) + .withSchema(SCHEMA.toString()) + .build(); + return new HoodieJavaWriteClient<>(context, config); + } + + private void prepareTable(HoodieJavaWriteClient<RawTripTestPayload> writeClient) throws IOException { + String commitTime = "1"; + writeClient.startCommitWithTime(commitTime); + //language=JSON + String jsonRow = "{\"_row_key\": \"1\", \"time\": \"2000-01-01T00:00:00.000Z\", \"number\": 1}"; + RawTripTestPayload payload = new RawTripTestPayload(jsonRow); + HoodieAvroRecord<RawTripTestPayload> record = new HoodieAvroRecord<>( + new HoodieKey(payload.getRowKey(), payload.getPartitionPath()), payload); + writeClient.insert(Collections.singletonList(record), commitTime); + } + + private Types.Field getFieldByName(String fieldName) { + return new TableSchemaResolver(metaClient) + .getTableInternalSchemaFromCommitMetadata() + .get() + .findField(fieldName); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 2f226b2d46..ef2de67347 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -184,6 +184,10 @@ public class HoodieAvroUtils { return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema)); } + public static Schema createHoodieWriteSchema(String originalSchema, boolean withOperationField) { + return addMetadataFields(new Schema.Parser().parse(originalSchema), withOperationField); + } + /** * Adds the Hoodie metadata fields to the given schema. *