This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f7877f2855 [HUDI-6530] Applying schema during ingestion using a 
schema provider for s3/gcs metadata job (#9191)
8f7877f2855 is described below

commit 8f7877f28559f49b90225a279d5a7ad50c689c0b
Author: lokesh-lingarajan-0310 
<84048984+lokesh-lingarajan-0...@users.noreply.github.com>
AuthorDate: Fri Jul 14 08:39:36 2023 -0700

    [HUDI-6530] Applying schema during ingestion using a schema provider for 
s3/gcs metadata job (#9191)
    
    Co-authored-by: Lokesh Lingarajan 
<lokeshlingarajan@Lokeshs-MacBook-Pro.local>
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |   8 +
 .../hudi/utilities/sources/GcsEventsSource.java    |  11 +-
 .../hudi/utilities/sources/S3EventsSource.java     |  17 +-
 .../utilities/sources/TestGcsEventsSource.java     |  42 ++++-
 .../hudi/utilities/sources/TestS3EventsSource.java |   4 +-
 .../resources/streamer-config/gcs-metadata.avsc    |  60 ++++---
 .../resources/streamer-config/s3-metadata.avsc     | 188 +++++++++++++++++++++
 7 files changed, 299 insertions(+), 31 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index a0d241752c5..35a5c9fcb47 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -60,6 +60,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
 import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
 import 
org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor;
+import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
 import 
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
@@ -193,6 +194,13 @@ public class UtilHelpers {
 
   }
 
+  public static StructType getSourceSchema(SchemaProvider schemaProvider) {
+    if (schemaProvider != null && schemaProvider.getSourceSchema() != null && 
schemaProvider.getSourceSchema() != InputBatch.NULL_SCHEMA) {
+      return 
AvroConversionUtils.convertAvroSchemaToStructType(schemaProvider.getSourceSchema());
+    }
+    return null;
+  }
+
   public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt, Option<Schema> sourceSchema,
                                                       boolean 
isErrorTableWriterEnabled) throws IOException {
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
index dfc9b5b2b2e..89ce7eddf54 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch;
@@ -35,6 +36,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,6 +98,7 @@ 
absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \
 public class GcsEventsSource extends RowSource {
 
   private final PubsubMessagesFetcher pubsubMessagesFetcher;
+  private final SchemaProvider schemaProvider;
   private final boolean ackMessages;
 
   private final List<String> messagesToAck = new ArrayList<>();
@@ -121,6 +124,7 @@ public class GcsEventsSource extends RowSource {
 
     this.pubsubMessagesFetcher = pubsubMessagesFetcher;
     this.ackMessages = props.getBoolean(ACK_MESSAGES.key(), 
ACK_MESSAGES.defaultValue());
+    this.schemaProvider = schemaProvider;
 
     LOG.info("Created GcsEventsSource");
   }
@@ -146,7 +150,12 @@ public class GcsEventsSource extends RowSource {
 
     LOG.info("Returning checkpoint value: " + CHECKPOINT_VALUE_ZERO);
 
-    return Pair.of(Option.of(sparkSession.read().json(eventRecords)), 
CHECKPOINT_VALUE_ZERO);
+    StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider);
+    if (sourceSchema != null) {
+      return 
Pair.of(Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)), 
CHECKPOINT_VALUE_ZERO);
+    } else {
+      return Pair.of(Option.of(sparkSession.read().json(eventRecords)), 
CHECKPOINT_VALUE_ZERO);
+    }
   }
 
   @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
index df84217381b..78a3b58bcee 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.config.S3SourceConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
@@ -32,6 +33,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -47,6 +49,7 @@ import java.util.List;
 public class S3EventsSource extends RowSource implements Closeable {
 
   private final S3EventsMetaSelector pathSelector;
+  private final SchemaProvider schemaProvider;
   private final List<Message> processedMessages = new ArrayList<>();
   AmazonSQS sqs;
 
@@ -58,6 +61,7 @@ public class S3EventsSource extends RowSource implements 
Closeable {
     super(props, sparkContext, sparkSession, schemaProvider);
     this.pathSelector = S3EventsMetaSelector.createSourceSelector(props);
     this.sqs = this.pathSelector.createAmazonSqsClient();
+    this.schemaProvider = schemaProvider;
   }
 
   /**
@@ -76,9 +80,16 @@ public class S3EventsSource extends RowSource implements 
Closeable {
       return Pair.of(Option.empty(), 
selectPathsWithLatestSqsMessage.getRight());
     } else {
       Dataset<String> eventRecords = 
sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(), 
Encoders.STRING());
-      return Pair.of(
-          Option.of(sparkSession.read().json(eventRecords)),
-          selectPathsWithLatestSqsMessage.getRight());
+      StructType sourceSchema = UtilHelpers.getSourceSchema(schemaProvider);
+      if (sourceSchema != null) {
+        return Pair.of(
+            
Option.of(sparkSession.read().schema(sourceSchema).json(eventRecords)),
+            selectPathsWithLatestSqsMessage.getRight());
+      } else {
+        return Pair.of(
+            Option.of(sparkSession.read().json(eventRecords)),
+            selectPathsWithLatestSqsMessage.getRight());
+      }
     }
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
index 92cb3dc4431..653cb823233 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
@@ -63,7 +63,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
 
   @BeforeEach
   public void beforeEach() throws Exception {
-    schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), 
jsc);
+    schemaProvider = new 
FilebasedSchemaProvider(Helpers.setupSchemaOnDFS("delta-streamer-config", 
"gcs-metadata.avsc"), jsc);
     MockitoAnnotations.initMocks(this);
 
     props = new TypedProperties();
@@ -86,12 +86,42 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
 
   @Test
   public void shouldReturnDataOnValidMessages() {
-    ReceivedMessage msg1 = fileCreateMessage("objectId-1", 
"{'data':{'bucket':'bucket-1'}}");
-    ReceivedMessage msg2 = fileCreateMessage("objectId-2", 
"{'data':{'bucket':'bucket-2'}}");
+    ReceivedMessage msg1 = fileCreateMessage("objectId-1", "{\n"
+        + "  \"kind\": \"storage#object\",\n"
+        + "  \"id\": \"bucket-name/object-name/1234567890123456\",\n"
+        + "  \"selfLink\": 
\"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n";
+        + "  \"name\": \"object-name-1\",\n"
+        + "  \"bucket\": \"bucket-1\",\n"
+        + "  \"generation\": \"1234567890123456\",\n"
+        + "  \"metageneration\": \"1\",\n"
+        + "  \"contentType\": \"application/octet-stream\",\n"
+        + "  \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n"
+        + "  \"updated\": \"2023-07-09T10:15:30.000Z\",\n"
+        + "  \"size\": \"1024\",\n"
+        + "  \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n"
+        + "  \"crc32c\": \"AAAAAAAAAAA=\",\n"
+        + "  \"etag\": \"CO2j+pDxx-ACEAE=\"\n"
+        + "}");
+    ReceivedMessage msg2 = fileCreateMessage("objectId-2", "{\n"
+        + "  \"kind\": \"storage#object\",\n"
+        + "  \"id\": \"bucket-name/object-name/1234567890123456\",\n"
+        + "  \"selfLink\": 
\"https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name\",\n";
+        + "  \"name\": \"object-name-2\",\n"
+        + "  \"bucket\": \"bucket-2\",\n"
+        + "  \"generation\": \"1234567890123456\",\n"
+        + "  \"metageneration\": \"1\",\n"
+        + "  \"contentType\": \"application/octet-stream\",\n"
+        + "  \"timeCreated\": \"2023-07-09T10:15:30.000Z\",\n"
+        + "  \"updated\": \"2023-07-09T10:15:30.000Z\",\n"
+        + "  \"size\": \"1024\",\n"
+        + "  \"md5Hash\": \"e4e68fb326b0d21a1bc7a12bb6b1e642\",\n"
+        + "  \"crc32c\": \"AAAAAAAAAAA=\",\n"
+        + "  \"etag\": \"CO2j+pDxx-ACEAE=\"\n"
+        + "}");
 
     when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(msg1, 
msg2));
 
-    GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, 
null,
+    GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, 
schemaProvider,
             pubsubMessagesFetcher);
     Pair<Option<Dataset<Row>>, String> dataAndCheckpoint = 
source.fetchNextBatch(Option.of("0"), 100);
     source.onCommit(dataAndCheckpoint.getRight());
@@ -101,8 +131,8 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
     Dataset<Row> resultDs = dataAndCheckpoint.getLeft().get();
     List<Row> result = resultDs.collectAsList();
 
-    assertBucket(result.get(0), "bucket-1");
-    assertBucket(result.get(1), "bucket-2");
+    assertEquals(result.get(0).getAs("bucket"), "bucket-1");
+    assertEquals(result.get(1).getAs("bucket"), "bucket-2");
 
     verify(pubsubMessagesFetcher).fetchMessages();
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
index 02ab061109e..4db47c76784 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import 
org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase;
 
 import org.apache.avro.generic.GenericRecord;
@@ -50,6 +51,7 @@ public class TestS3EventsSource extends 
AbstractCloudObjectsSourceTestBase {
     this.dfsRoot = basePath + "/parquetFiles";
     this.fileSuffix = ".parquet";
     fs.mkdirs(new Path(dfsRoot));
+    schemaProvider = new 
FilebasedSchemaProvider(Helpers.setupSchemaOnDFS("delta-streamer-config", 
"s3-metadata.avsc"), jsc);
   }
 
   @AfterEach
@@ -100,7 +102,7 @@ public class TestS3EventsSource extends 
AbstractCloudObjectsSourceTestBase {
     props.setProperty(S3_SOURCE_QUEUE_URL.key(), sqsUrl);
     props.setProperty(S3_SOURCE_QUEUE_REGION.key(), regionName);
     props.setProperty(S3_SOURCE_QUEUE_FS.key(), "hdfs");
-    S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, 
null);
+    S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, 
schemaProvider);
     dfsSource.sqs = this.sqs;
     return dfsSource;
   }
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc 
b/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
index 79baf5eb80d..3e190932dc1 100644
--- a/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
+++ b/hudi-utilities/src/test/resources/streamer-config/gcs-metadata.avsc
@@ -22,11 +22,13 @@
     "fields": [
         {
             "name": "_row_key",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "partition_path",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "timestamp",
@@ -34,75 +36,93 @@
         },
         {
             "name": "bucket",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "contentLanguage",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "contentType",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "crc32c",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "etag",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "generation",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "id",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "kind",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "md5Hash",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "mediaLink",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "metageneration",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "name",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "selfLink",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "size",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "storageClass",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "timeCreated",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "timeStorageClassUpdated",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         },
         {
             "name": "updated",
-            "type": "string"
+            "type": ["null", "string"],
+            "default": null
         }
     ]
 }
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc 
b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc
new file mode 100644
index 00000000000..64b169c1373
--- /dev/null
+++ b/hudi-utilities/src/test/resources/streamer-config/s3-metadata.avsc
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type": "record",
+  "name": "hoodie_source",
+  "namespace": "hoodie.source",
+  "fields": [
+    {
+      "name": "awsRegion",
+      "type": ["null", "string"],
+      "default": null
+    },
+    {
+      "name": "eventName",
+      "type": ["null", "string"],
+      "default": null
+    },
+    {
+      "name": "eventSource",
+      "type": ["null", "string"],
+      "default": null
+    },
+    {
+      "name": "eventTime",
+      "type": ["null", "string"],
+      "default": null
+    },
+    {
+      "name": "eventVersion",
+      "type": ["null", "string"],
+      "default": null
+    },
+    {
+      "name": "requestParameters",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "requestParameters",
+          "namespace": "hoodie.source.hoodie_source",
+          "fields": [
+            {
+              "name": "sourceIPAddress",
+              "type": ["null", "string"],
+              "default": null
+            }
+          ]
+        }
+      ],
+      "default": null
+    },
+    {
+      "name": "s3",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "s3",
+          "namespace": "hoodie.source.hoodie_source",
+          "fields": [
+            {
+              "name": "bucket",
+              "type": [
+                "null",
+                {
+                  "type": "record",
+                  "name": "bucket",
+                  "namespace": "hoodie.source.hoodie_source.s3",
+                  "fields": [
+                    {
+                      "name": "arn",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "name",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "ownerIdentity",
+                      "type": [
+                        "null",
+                        {
+                          "type": "record",
+                          "name": "ownerIdentity",
+                          "namespace": "hoodie.source.hoodie_source.s3.bucket",
+                          "fields": [
+                            {
+                              "name": "principalId",
+                              "type": ["null", "string"],
+                              "default": null
+                            }
+                          ]
+                        }
+                      ],
+                      "default": null
+                    }
+                  ]
+                }
+              ],
+              "default": null
+            },
+            {
+              "name": "configurationId",
+              "type": ["null", "string"],
+              "default": null
+            },
+            {
+              "name": "object",
+              "type": [
+                "null",
+                {
+                  "type": "record",
+                  "name": "object",
+                  "namespace": "hoodie.source.hoodie_source.s3",
+                  "fields": [
+                    {
+                      "name": "eTag",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "key",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "sequencer",
+                      "type": ["null", "string"],
+                      "default": null
+                    },
+                    {
+                      "name": "size",
+                      "type": ["null", "long"],
+                      "default": null
+                    }
+                  ]
+                }
+              ],
+              "default": null
+            },
+            {
+              "name": "s3SchemaVersion",
+              "type": ["null", "string"],
+              "default": null
+            }
+          ]
+        }
+      ],
+      "default": null
+    },
+    {
+      "name": "userIdentity",
+      "type": [
+        "null",
+        {
+          "type": "record",
+          "name": "userIdentity",
+          "namespace": "hoodie.source.hoodie_source",
+          "fields": [
+            {
+              "name": "principalId",
+              "type": ["null", "string"],
+              "default": null
+            }
+          ]
+        }
+      ],
+      "default": null
+    }
+  ]
+}
\ No newline at end of file

Reply via email to