codope commented on code in PR #6176:
URL: https://github.com/apache/hudi/pull/6176#discussion_r928818365


##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)

Review Comment:
   should be multi-line comment



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -274,6 +277,11 @@ public RawTripTestPayload 
generatePayloadForShortTripSchema(HoodieKey key, Strin
     return new RawTripTestPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), SHORT_TRIP_SCHEMA);
   }
 
+  public RawTripTestPayload generatePayloadForS3EventsSchema(HoodieKey key, 
String commitTime) throws IOException {

Review Comment:
   `RawTripTestPayload` assumes some form of trips schema. If you look at its 
constructor, we don't use the schema. And its APIs assume a few things about 
the schema. Should we keep all this out of `HoodieTestDataGenerator`?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,70 @@
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)
+public class S3EventsSchemaUtils {
+  public static final String DEFAULT_STRING_VALUE = "default_string";
+
+  public static String generateSchemaString() {
+    return generateS3EventSchema().toString();
+  }
+
+  public static Schema generateObjInfoSchema() {
+    Schema objInfo = SchemaBuilder.record("objInfo")
+        .fields()
+        .requiredString("key")
+        .requiredLong("size")
+        .endRecord();
+    return objInfo;
+  }
+
+  public static GenericRecord generateObjInfoRecord(String key, Long size) {
+    GenericRecord rec = new GenericData.Record(generateObjInfoSchema());
+    rec.put("key", key);
+    rec.put("size", size);
+    return rec;
+  }
+
+  public static Schema generateS3MetadataSchema() {
+    Schema s3Metadata = SchemaBuilder.record("s3Metadata")
+        .fields()
+        .requiredString("configurationId")
+        .name("object")
+        .type(generateObjInfoSchema())
+        .noDefault()
+        .endRecord();
+    return s3Metadata;
+  }
+
+  public static GenericRecord generateS3MetadataRecord(GenericRecord 
objRecord) {
+    GenericRecord rec = new GenericData.Record(generateS3MetadataSchema());
+    rec.put("configurationId", DEFAULT_STRING_VALUE);
+    rec.put("object", objRecord);
+    return rec;
+  }
+
+  public static Schema generateS3EventSchema() {
+    Schema s3Event = SchemaBuilder.record("s3Event")
+        .fields()
+        .requiredString("eventSource")
+        .requiredString("eventName")
+        .name("s3")

Review Comment:
   Let's extract all these strings to constants.



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Generic class for specific payload implementations to inherit from.
+ */
+public abstract class GenericTestPayload {

Review Comment:
   Maybe rename to AbstractJsonTestPayload? It's essentially for json data 
right?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Generic class for specific payload implementations to inherit from.
+ */
+public abstract class GenericTestPayload {
+
+  protected static final transient ObjectMapper OBJECT_MAPPER = new 
ObjectMapper();
+  protected String partitionPath;
+  protected String rowKey;
+  protected byte[] jsonDataCompressed;
+  protected int dataSize;
+  protected boolean isDeleted;
+  protected Comparable orderingVal;
+
+  public GenericTestPayload(Option<String> jsonData, String rowKey, String 
partitionPath, String schemaStr,
+                            Boolean isDeleted, Comparable orderingVal) throws 
IOException {
+    if (jsonData.isPresent()) {
+      this.jsonDataCompressed = compressData(jsonData.get());
+      this.dataSize = jsonData.get().length();
+    }
+    this.rowKey = rowKey;
+    this.partitionPath = partitionPath;
+    this.isDeleted = isDeleted;
+    this.orderingVal = orderingVal;
+  }
+
+  public GenericTestPayload(String jsonData, String rowKey, String 
partitionPath, String schemaStr) throws IOException {
+    this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
+  }
+
+  public GenericTestPayload(String jsonData) throws IOException {
+    this.jsonDataCompressed = compressData(jsonData);
+    this.dataSize = jsonData.length();
+    Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, 
Map.class);
+    this.rowKey = jsonRecordMap.get("_row_key").toString();
+    this.partitionPath = 
jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");
+    this.isDeleted = false;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
+    MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
+    return jsonConverter.convert(getJsonData(), schema);
+  }
+
+  public String getRowKey() {
+    return rowKey;
+  }
+
+  public String getJsonData() throws IOException {
+    return unCompressData(jsonDataCompressed);
+  }
+
+  private byte[] compressData(String jsonData) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DeflaterOutputStream dos = new DeflaterOutputStream(baos, new 
Deflater(Deflater.BEST_COMPRESSION), true);
+    try {
+      dos.write(jsonData.getBytes());
+    } finally {
+      dos.flush();
+      dos.close();

Review Comment:
   Will this close the ByteArrayOutputStream too?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -189,33 +194,36 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
         .filter(filter)
         .select("s3.bucket.name", "s3.object.key")
         .distinct()
-        .mapPartitions((MapPartitionsFunction<Row, String>)  fileListIterator 
-> {
+        .rdd()
+        // JavaRDD simplifies coding with collect and suitable mapPartitions 
signature. check if this can be avoided.
+        .toJavaRDD()
+        .mapPartitions(fileListIterator -> {
           List<String> cloudFilesPerPartition = new ArrayList<>();
-          final Configuration configuration = 
serializableConfiguration.newCopy();
           fileListIterator.forEachRemaining(row -> {
+            // TODO: configuration is updated in the getFs call. check if new 
copy is needed w.r.t to getFs.

Review Comment:
   Is this still required?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.S3EventsSchemaUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.S3_EVENTS_SCHEMA;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class TestS3EventsHoodieIncrSource extends 
SparkClientFunctionalTestHarness {
+  private HoodieTestDataGenerator dataGen;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    dataGen = new HoodieTestDataGenerator();
+    metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+  }
+
+  @Test
+  public void testHoodieIncrSource() throws IOException {

Review Comment:
   Maybe rename to testS3EventsHoodieIncrSource?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)

Review Comment:
   Should be multi line comment.



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -216,13 +218,15 @@ public int getEstimatedFileSizeInBytes(int numOfRecords) {
     return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
   }
 
-  public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, 
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
+  public HoodieRecordPayload generateRandomValueAsPerSchema(String schemaStr, 
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {

Review Comment:
   Why this rename?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java:
##########
@@ -161,11 +159,18 @@ public Pair<Option<Dataset<Row>>, String> 
fetchNextBatch(Option<String> lastCkpt
           .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
+    return Pair.of(Option.of(source), 
queryTypeAndInstantEndpts.getRight().getRight());
+  }
 
-    if (source.isEmpty()) {
-      return Pair.of(Option.empty(), 
queryTypeAndInstantEndpts.getRight().getRight());
+  @Override
+  public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
+    Pair<Option<Dataset<Row>>, String> sourceMetadata = 
fetchMetadata(lastCkptStr, sourceLimit);
+    if (!sourceMetadata.getKey().isPresent()) {

Review Comment:
   sourceMetadata.getLeft?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+// Utility for the schema of S3 events listed here 
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html)
+public class S3EventsSchemaUtils {
+  public static final String DEFAULT_STRING_VALUE = "default_string";
+  public static final GenericRecord DEFAULT_S3_BUCKET_RECORD;
+  static {
+    GenericRecord rec = new GenericData.Record(generateBucketInfoSchema());
+    rec.put("name", "default_s3_bucket");
+    DEFAULT_S3_BUCKET_RECORD = rec;
+  }
+
+  public static String generateSchemaString() {
+    return generateS3EventSchema().toString();
+  }
+
+  public static Schema generateObjInfoSchema() {
+    Schema objInfo = SchemaBuilder.record("objInfo")
+        .fields()
+        .requiredString("key")
+        .requiredLong("size")
+        .endRecord();
+    return objInfo;
+  }
+
+  public static Schema generateBucketInfoSchema() {
+    Schema bucketInfo = SchemaBuilder.record("bucketInfo")
+        .fields()
+        .requiredString("name")
+        .endRecord();
+    return bucketInfo;
+  }
+
+  public static GenericRecord generateObjInfoRecord(String key, Long size) {
+    GenericRecord rec = new GenericData.Record(generateObjInfoSchema());
+    rec.put("key", key);
+    rec.put("size", size);
+    return rec;
+  }
+
+  public static Schema generateS3MetadataSchema() {
+    Schema s3Metadata = SchemaBuilder.record("s3Metadata")
+        .fields()
+        .requiredString("configurationId")
+        .name("object")
+        .type(generateObjInfoSchema())
+        .noDefault()
+        .name("bucket")
+        .type(generateBucketInfoSchema())
+        .noDefault()
+        .endRecord();
+    return s3Metadata;
+  }
+
+  public static GenericRecord generateS3MetadataRecord(GenericRecord 
objRecord) {
+    GenericRecord rec = new GenericData.Record(generateS3MetadataSchema());
+    rec.put("configurationId", DEFAULT_STRING_VALUE);
+    rec.put("object", objRecord);
+    rec.put("bucket", DEFAULT_S3_BUCKET_RECORD);
+    return rec;
+  }
+
+  public static Schema generateS3EventSchema() {
+    Schema s3Event = SchemaBuilder.record("s3Event")
+        .fields()
+        .requiredString("eventSource")
+        .requiredString("eventName")
+        .requiredString("_row_key")
+        .name("s3")

Review Comment:
   Preferably extract all these as static string constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to