yihua commented on code in PR #9473:
URL: https://github.com/apache/hudi/pull/9473#discussion_r1305972332


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -130,7 +130,7 @@ public static QueryInfo generateQueryInfo(JavaSparkContext 
jssc, String srcBaseP
       }
     });
 
-    String previousInstantTime = beginInstantTime;
+    String previousInstantTime = DEFAULT_BEGIN_TIMESTAMP;

Review Comment:
   Should this be the commit time before the last commit time referenced in the 
checkpoint (`beginInstantTime`) if I understand your statement correctly?  
Using `DEFAULT_BEGIN_TIMESTAMP`/`000` here fetches all commits before the last 
commit time.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java:
##########
@@ -247,4 +273,98 @@ void testLastObjectInCommit() {
     assertEquals("commit3#path/to/file8.json", result.getKey().toString());
     assertTrue(!result.getRight().isPresent());
   }
+
+  private HoodieRecord generateS3EventMetadata(String commitTime, String 
bucketName, String objectKey, Long objectSize) {
+    String partitionPath = bucketName;
+    Schema schema = S3_METADATA_SCHEMA;
+    GenericRecord rec = new GenericData.Record(schema);
+    Schema.Field s3Field = schema.getField("s3");
+    Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the 
record schema is the second type
+    // Create a generic record for the "s3" field
+    GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+    Schema.Field s3BucketField = s3Schema.getField("bucket");
+    Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+    s3BucketRec.put("name", bucketName);
+
+
+    Schema.Field s3ObjectField = s3Schema.getField("object");
+    Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+    s3ObjectRec.put("key", objectKey);
+    s3ObjectRec.put("size", objectSize);
+
+    s3Record.put("bucket", s3BucketRec);
+    s3Record.put("object", s3ObjectRec);
+    rec.put("s3", s3Record);
+    rec.put("_hoodie_commit_time", commitTime);
+
+    HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+    return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), 
payload);
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(String basePath, 
HoodieTableMetaClient metaClient) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(S3_METADATA_SCHEMA.toString())
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .forTable(metaClient.getTableConfig().getTableName());
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    return getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .build();
+  }
+
+  private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String 
commitTime) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+
+    writeClient.startCommitWithTime(commitTime);
+    List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+        generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L)
+    );
+    JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    return Pair.of(commitTime, s3MetadataRecords);
+  }
+
+  @Test
+  void testQueryInfoGeneration() throws IOException {
+    String commitTimeForReads = "1";

Review Comment:
   Could you add some docs on the details of what this test is about and the 
flow tested?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java:
##########
@@ -247,4 +273,98 @@ void testLastObjectInCommit() {
     assertEquals("commit3#path/to/file8.json", result.getKey().toString());
     assertTrue(!result.getRight().isPresent());
   }
+
+  private HoodieRecord generateS3EventMetadata(String commitTime, String 
bucketName, String objectKey, Long objectSize) {
+    String partitionPath = bucketName;
+    Schema schema = S3_METADATA_SCHEMA;
+    GenericRecord rec = new GenericData.Record(schema);
+    Schema.Field s3Field = schema.getField("s3");
+    Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the 
record schema is the second type
+    // Create a generic record for the "s3" field
+    GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+    Schema.Field s3BucketField = s3Schema.getField("bucket");
+    Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+    s3BucketRec.put("name", bucketName);
+
+
+    Schema.Field s3ObjectField = s3Schema.getField("object");
+    Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming 
the record schema is the second type
+    GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+    s3ObjectRec.put("key", objectKey);
+    s3ObjectRec.put("size", objectSize);
+
+    s3Record.put("bucket", s3BucketRec);
+    s3Record.put("object", s3ObjectRec);
+    rec.put("s3", s3Record);
+    rec.put("_hoodie_commit_time", commitTime);
+
+    HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+    return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath), 
payload);
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(String basePath, 
HoodieTableMetaClient metaClient) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(S3_METADATA_SCHEMA.toString())
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+        .forTable(metaClient.getTableConfig().getTableName());
+  }
+
+  private HoodieWriteConfig getWriteConfig() {
+    return getConfigBuilder(basePath(), metaClient)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 
3).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .build();
+  }
+
+  private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String 
commitTime) throws IOException {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+    SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+
+    writeClient.startCommitWithTime(commitTime);
+    List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+        generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L)
+    );
+    JavaRDD<WriteStatus> result = 
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    return Pair.of(commitTime, s3MetadataRecords);
+  }
+
+  @Test
+  void testQueryInfoGeneration() throws IOException {
+    String commitTimeForReads = "1";
+    String commitTimeForWrites = "2";
+
+    Pair<String, List<HoodieRecord>> inserts = 
writeS3MetadataRecords(commitTimeForReads);
+    inserts = writeS3MetadataRecords(commitTimeForWrites);
+
+    String startInstant = commitTimeForReads;
+    String orderColumn = "_hoodie_commit_time";
+    String keyColumn = "s3.object.key";
+    String limitColumn = "s3.object.size";
+    QueryInfo queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 
5, Option.of(startInstant), null,
+        TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, 
limitColumn, true, Option.empty());
+    assertEquals(HoodieTimeline.INIT_INSTANT_TS, 
queryInfo.getPreviousInstant());
+    assertEquals(commitTimeForReads, queryInfo.getStartInstant());
+    assertEquals(commitTimeForWrites, queryInfo.getEndInstant());
+
+
+    startInstant = commitTimeForWrites;
+    queryInfo = IncrSourceHelper.generateQueryInfo(jsc, basePath(), 5, 
Option.of(startInstant), null,
+        TimelineUtils.HollowCommitHandling.BLOCK, orderColumn, keyColumn, 
limitColumn, true, Option.empty());
+    assertEquals(commitTimeForReads, queryInfo.getPreviousInstant());
+    assertEquals(commitTimeForWrites, queryInfo.getStartInstant());
+    assertEquals(commitTimeForWrites, queryInfo.getEndInstant());
+

Review Comment:
   nit: remove empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to