nsivabalan commented on code in PR #8829:
URL: https://github.com/apache/hudi/pull/8829#discussion_r1212410928


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java:
##########
@@ -57,9 +57,11 @@ protected void validateUsingQuery(String query, String 
prevTableSnapshot, String
     String queryWithPrevSnapshot = 
query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, 
prevTableSnapshot);
     String queryWithNewSnapshot = 
query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, 
newTableSnapshot);
     LOG.info("Running query on previous state: " + queryWithPrevSnapshot);
-    Dataset<Row> prevRows = sqlContext.sql(queryWithPrevSnapshot);
+    Dataset<Row> prevRows = sqlContext.sql(queryWithPrevSnapshot).cache();
+    LOG.info("Total rows in prevRows " + prevRows.count());
     LOG.info("Running query on new state: " + queryWithNewSnapshot);
-    Dataset<Row> newRows  = sqlContext.sql(queryWithNewSnapshot);
+    Dataset<Row> newRows  = sqlContext.sql(queryWithNewSnapshot).cache();
+    LOG.info("Total rows in newRows " + newRows.count());

Review Comment:
   should we say "total rows in after state" or something. "newRows" seems not 
a good name



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java:
##########
@@ -56,9 +56,11 @@ protected void validateUsingQuery(String query, String 
prevTableSnapshot, String
     String queryWithPrevSnapshot = 
query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, 
prevTableSnapshot);
     String queryWithNewSnapshot = 
query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, 
newTableSnapshot);
     LOG.info("Running query on previous state: " + queryWithPrevSnapshot);
-    Dataset<Row> prevRows = sqlContext.sql(queryWithPrevSnapshot);
+    Dataset<Row> prevRows = sqlContext.sql(queryWithPrevSnapshot).cache();
+    LOG.info("Total rows in prevRows " + prevRows.count());
     LOG.info("Running query on new state: " + queryWithNewSnapshot);
-    Dataset<Row> newRows  = sqlContext.sql(queryWithNewSnapshot);
+    Dataset<Row> newRows  = sqlContext.sql(queryWithNewSnapshot).cache();
+    LOG.info("Total rows in newRows " + newRows.count());

Review Comment:
   same here. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java:
##########
@@ -90,12 +90,16 @@ public R getResult() {
     }
   }
 
-  static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
+  /**
+   * Transformer function to help transform a HoodieRecord. This transformer 
is used by BufferedIterator to offload some
+   * expensive operations of transformation to the reader thread.
+   */
+  public <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
                                                                                
                 HoodieWriteConfig writeConfig) {
     return getTransformerInternal(schema, writeConfig);
   }
 
-  private static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,
+  public static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,

Review Comment:
   does this need to be static then ? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java:
##########
@@ -73,7 +79,23 @@ public void validate(String instantTime, 
HoodieWriteMetadata<O> writeResult, Dat
     try {
       validateRecordsBeforeAndAfter(before, after, 
getPartitionsModified(writeResult));
     } finally {
-      LOG.info(getClass() + " validator took " + timer.endTimer() + " ms");
+      long duration = timer.endTimer();
+      LOG.info(getClass() + " validator took " + duration + " ms" + ", metrics 
on? " + getWriteConfig().isMetricsOn());
+      publishRunStats(instantTime, duration);
+    }
+  }
+
+  /**
+   * Publish pre-commit validator run stats for a given commit action.
+   */
+  private void publishRunStats(String instantTime, long duration) {
+    // Record validator duration metrics.
+    if (getWriteConfig().isMetricsOn()) {
+      HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
+      Option<HoodieInstant> currentInstant = metaClient.getActiveTimeline()
+          .findInstantsAfterOrEquals(instantTime, 1)
+          .firstInstant();
+      metrics.reportMetrics(currentInstant.get().getAction(), 
getClass().getSimpleName(), duration);

Review Comment:
   not too strong on the suggestion. 
   but we can fetch the write operation type from HoodieCommitMetadata which is 
within HoodieWriteMetadata. 
   we can avoid loading the timeline if we go that route. 
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -124,8 +125,15 @@ private HoodieData<HoodieRecord<T>> 
clusteringHandleUpdate(HoodieData<HoodieReco
     UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = 
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
         .loadClass(config.getClusteringUpdatesStrategyClass(), new Class<?>[] 
{HoodieEngineContext.class, HoodieTable.class, Set.class},
             this.context, table, fileGroupsInPendingClustering);
+    // For SparkAllowUpdateStrategy with rollback pending clustering as false, 
need not handle
+    // the file group intersection between current ingestion and pending 
clustering file groups.
+    // This will be handled at the conflict resolution strategy.

Review Comment:
   did we add a test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to