This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8f7877f2855 [HUDI-6530] Applying schema during ingestion using a schema provider for s3/gcs metadata job (#9191) 8f7877f2855 is described below commit 8f7877f28559f49b90225a279d5a7ad50c689c0b Author: lokesh-lingarajan-0310 <84048984+lokesh-lingarajan-0...@users.noreply.github.com> AuthorDate: Fri Jul 14 08:39:36 2023 -0700 [HUDI-6530] Applying schema during ingestion using a schema provider for s3/gcs metadata job (#9191) Co-authored-by: Lokesh Lingarajan <lokeshlingarajan@Lokeshs-MacBook-Pro.local> --- .../org/apache/hudi/utilities/UtilHelpers.java | 8 + .../hudi/utilities/sources/GcsEventsSource.java | 11 +- .../hudi/utilities/sources/S3EventsSource.java | 17 +- .../utilities/sources/TestGcsEventsSource.java | 42 ++++- .../hudi/utilities/sources/TestS3EventsSource.java | 4 +- .../resources/streamer-config/gcs-metadata.avsc | 60 ++++--- .../resources/streamer-config/s3-metadata.avsc | 188 +++++++++++++++++++++ 7 files changed, 299 insertions(+), 31 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index a0d241752c5..35a5c9fcb47 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -60,6 +60,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor; +import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; @@ -193,6 +194,13 @@ public class UtilHelpers { } + public static StructType getSourceSchema(SchemaProvider schemaProvider) { + if (schemaProvider != null && schemaProvider.getSourceSchema() != null && schemaProvider.getSourceSchema() != InputBatch.NULL_SCHEMA) { + return AvroConversionUtils.convertAvroSchemaToStructType(schemaProvider.getSourceSchema()); + } + return null; + } + public static Option<Transformer> createTransformer(Option<List<String>> classNamesOpt, Option<Schema> sourceSchema, boolean isErrorTableWriterEnabled) throws IOException { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java index dfc9b5b2b2e..89ce7eddf54 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch; @@ -35,6 +36,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +98,7 @@ absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \ public class GcsEventsSource extends RowSource { private final PubsubMessagesFetcher pubsubMessagesFetcher; + private final SchemaProvider schemaProvider; private final boolean ackMessages; private final List<String> messagesToAck = new ArrayList<>(); @@ -121,6 +124,7 @@ public class GcsEventsSource extends RowSource { this.pubsubMessagesFetcher = pubsubMessagesFetcher; this.ackMessages = props.getBoolean(ACK_MESSAGES.key(), ACK_MESSAGES.defaultValue()); + this.schemaProvider = schemaProvider; LOG.info("Created GcsEventsSource"); } @@ -146,7 +150,12 @@ public class GcsEventsSource extends RowSource { LOG.info("Returning checkpoint value: " + CHECKPOINT_VALUE_ZERO); - return Pair.of(Option.of(sparkSession.read().json(eventRecords)), CHECKPOINT_VALUE_ZERO); + StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider); + if (sourceSchema != null) { + return Pair.of(Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)), CHECKPOINT_VALUE_ZERO); + } else { + return Pair.of(Option.of(sparkSession.read().json(eventRecords)), CHECKPOINT_VALUE_ZERO); + } } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java index df84217381b..78a3b58bcee 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.config.S3SourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector; @@ -32,6 +33,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; import java.io.Closeable; import java.io.IOException; @@ -47,6 +49,7 @@ import java.util.List; public class S3EventsSource extends RowSource implements Closeable { private final S3EventsMetaSelector pathSelector; + private final SchemaProvider schemaProvider; private final List<Message> processedMessages = new ArrayList<>(); AmazonSQS sqs; @@ -58,6 +61,7 @@ public class S3EventsSource extends RowSource implements Closeable { super(props, sparkContext, sparkSession, schemaProvider); this.pathSelector = S3EventsMetaSelector.createSourceSelector(props); this.sqs = this.pathSelector.createAmazonSqsClient(); + this.schemaProvider = schemaProvider; } /** @@ -76,9 +80,16 @@ public class S3EventsSource extends RowSource implements Closeable { return Pair.of(Option.empty(), selectPathsWithLatestSqsMessage.getRight()); } else { Dataset<String> eventRecords = sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(), Encoders.STRING()); - return Pair.of( - Option.of(sparkSession.read().json(eventRecords)), - selectPathsWithLatestSqsMessage.getRight()); + StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider); + if (sourceSchema != null) { + return Pair.of( + Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)), + selectPathsWithLatestSqsMessage.getRight()); + } else { + return Pair.of( + Option.of(sparkSession.read().json(eventRecords)), + selectPathsWithLatestSqsMessage.getRight()); + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java index 92cb3dc4431..653cb823233 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java @@ -63,7 +63,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase { @BeforeEach public void beforeEach() throws Exception { - schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS("delta-streamer-config", "gcs-metadata.avsc"), jsc); MockitoAnnotations.initMocks(this); props = new TypedProperties(); @@ -86,12 +86,42 @@ public class TestGcsEventsSource extends UtilitiesTestBase { @Test public void shouldReturnDataOnValidMessages() { - ReceivedMessage msg1 = fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}"); - ReceivedMessage msg2 = fileCreateMessage("objectId-2", "{'data':{'bucket':'bucket-2'}}"); + ReceivedMessage msg1 = fileCreateMessage("objectId-1", "{\n" + + " \"kind\": \"storage#object\",\n" + + " \"id\": \"bucket-name/object-name/1234567890123456\",\n" + + " \"selfLink\": \"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n" + + " \"name\": \"object-name-1\",\n" + + " \"bucket\": \"bucket-1\",\n" + + " \"generation\": \"1234567890123456\",\n" + + " \"metageneration\": \"1\",\n" + + " \"contentType\": \"application/octet-stream\",\n" + + " \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n" + + " \"updated\": \"2023-07-09T10:15:30.000Z\",\n" + + " \"size\": \"1024\",\n" + + " \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n" + + " \"crc32c\": \"AAAAAAAAAAA=\",\n" + + " \"etag\": \"CO2j+pDxx-ACEAE=\"\n" + + "}"); + ReceivedMessage msg2 = fileCreateMessage("objectId-2", "{\n" + + " \"kind\": \"storage#object\",\n" + + " \"id\": \"bucket-name/object-name/1234567890123456\",\n" + + " \"selfLink\": \"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n" + + " \"name\": \"object-name-2\",\n" + + " \"bucket\": \"bucket-2\",\n" + + " \"generation\": \"1234567890123456\",\n" + + " \"metageneration\": \"1\",\n" + + " \"contentType\": \"application/octet-stream\",\n" + + " \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n" + + " \"updated\": \"2023-07-09T10:15:30.000Z\",\n" + + " \"size\": \"1024\",\n" + + " \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n" + + " \"crc32c\": \"AAAAAAAAAAA=\",\n" + + " \"etag\": \"CO2j+pDxx-ACEAE=\"\n" + + "}"); when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(msg1, msg2)); - GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null, + GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, schemaProvider, pubsubMessagesFetcher); Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = source.fetchNextBatch(Option.of("0"), 100); source.onCommit(dataAndCheckpoint.getRight()); @@ -101,8 +131,8 @@ public class TestGcsEventsSource extends UtilitiesTestBase { Dataset<Row> resultDs = dataAndCheckpoint.getLeft().get(); List<Row> result = resultDs.collectAsList(); - assertBucket(result.get(0), "bucket-1"); - assertBucket(result.get(1), "bucket-2"); + assertEquals(result.get(0).getAs("bucket"), "bucket-1"); + assertEquals(result.get(1).getAs("bucket"), "bucket-2"); verify(pubsubMessagesFetcher).fetchMessages(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java index 02ab061109e..4db47c76784 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase; import org.apache.avro.generic.GenericRecord; @@ -50,6 +51,7 @@ public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase { this.dfsRoot = basePath + "/parquetFiles"; this.fileSuffix = ".parquet"; fs.mkdirs(new Path(dfsRoot)); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS("delta-streamer-config", "s3-metadata.avsc"), jsc); } @AfterEach @@ -100,7 +102,7 @@ public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase { props.setProperty(S3_SOURCE_QUEUE_URL.key(), sqsUrl); props.setProperty(S3_SOURCE_QUEUE_REGION.key(), regionName); props.setProperty(S3_SOURCE_QUEUE_FS.key(), "hdfs"); - S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, null); + S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, schemaProvider); dfsSource.sqs = this.sqs; return dfsSource; } diff --git a/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc b/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc index 79baf5eb80d..3e190932dc1 100644 --- a/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc +++ b/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc @@ -22,11 +22,13 @@ "fields": [ { "name": "_row_key", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "partition_path", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "timestamp", @@ -34,75 +36,93 @@ }, { "name": "bucket", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "contentLanguage", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "contentType", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "crc32c", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "etag", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "generation", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "id", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "kind", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "md5Hash", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "mediaLink", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "metageneration", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "name", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "selfLink", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "size", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "storageClass", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "timeCreated", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "timeStorageClassUpdated", - "type": "string" + "type": ["null", "string"], + "default": null }, { "name": "updated", - "type": "string" + "type": ["null", "string"], + "default": null } ] } \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc new file mode 100644 index 00000000000..64b169c1373 --- /dev/null +++ b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type": "record", + "name": "hoodie_source", + "namespace": "hoodie.source", + "fields": [ + { + "name": "awsRegion", + "type": ["null", "string"], + "default": null + }, + { + "name": "eventName", + "type": ["null", "string"], + "default": null + }, + { + "name": "eventSource", + "type": ["null", "string"], + "default": null + }, + { + "name": "eventTime", + "type": ["null", "string"], + "default": null + }, + { + "name": "eventVersion", + "type": ["null", "string"], + "default": null + }, + { + "name": "requestParameters", + "type": [ + "null", + { + "type": "record", + "name": "requestParameters", + "namespace": "hoodie.source.hoodie_source", + "fields": [ + { + "name": "sourceIPAddress", + "type": ["null", "string"], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "s3", + "type": [ + "null", + { + "type": "record", + "name": "s3", + "namespace": "hoodie.source.hoodie_source", + "fields": [ + { + "name": "bucket", + "type": [ + "null", + { + "type": "record", + "name": "bucket", + "namespace": "hoodie.source.hoodie_source.s3", + "fields": [ + { + "name": "arn", + "type": ["null", "string"], + "default": null + }, + { + "name": "name", + "type": ["null", "string"], + "default": null + }, + { + "name": "ownerIdentity", + "type": [ + "null", + { + "type": "record", + "name": "ownerIdentity", + "namespace": "hoodie.source.hoodie_source.s3.bucket", + "fields": [ + { + "name": "principalId", + "type": ["null", "string"], + "default": null + } + ] + } + ], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "configurationId", + "type": ["null", "string"], + "default": null + }, + { + "name": "object", + "type": [ + "null", + { + "type": "record", + "name": "object", + "namespace": "hoodie.source.hoodie_source.s3", + "fields": [ + { + "name": "eTag", + "type": ["null", "string"], + "default": null + }, + { + "name": "key", + "type": ["null", "string"], + "default": null + }, + { + "name": "sequencer", + "type": ["null", "string"], + "default": null + }, + { + "name": "size", + "type": ["null", "long"], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "s3SchemaVersion", + "type": ["null", "string"], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "userIdentity", + "type": [ + "null", + { + "type": "record", + "name": "userIdentity", + "namespace": "hoodie.source.hoodie_source", + "fields": [ + { + "name": "principalId", + "type": ["null", "string"], + "default": null + } + ] + } + ], + "default": null + } + ] +} \ No newline at end of file