wecharyu commented on code in PR #9899:
URL: https://github.com/apache/hudi/pull/9899#discussion_r1371235058


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -605,66 +605,70 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     }
 
     hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty");
-    if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
-      LOG.info("No new data, perform empty commit.");
-      return Pair.of(schemaProvider, Pair.of(checkpointStr, 
hoodieSparkContext.emptyRDD()));
-    }
+    try {
+      if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) 
{
+        LOG.info("No new data, perform empty commit.");
+        return Pair.of(schemaProvider, Pair.of(checkpointStr, 
hoodieSparkContext.emptyRDD()));
+      }
 
-    boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
-    Set<String> partitionColumns = getPartitionColumns(props);
-    JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
-
-    JavaRDD<HoodieRecord> records;
-    SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
-    SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
-    if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.mapPartitions(
-          (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) 
genericRecordIterator -> {
-            if (autoGenerateRecordKeys) {
-              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
-              
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
-            }
-            BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-            List<HoodieRecord> avroRecords = new ArrayList<>();
-            while (genericRecordIterator.hasNext()) {
-              GenericRecord genRec = genericRecordIterator.next();
-              HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
-              GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
-              HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-                  (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
-                      
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                      
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
-                  : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
-              avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
-            }
-            return avroRecords.iterator();
+      boolean shouldCombine = cfg.filterDupes || 
cfg.operation.equals(WriteOperationType.UPSERT);
+      Set<String> partitionColumns = getPartitionColumns(props);
+      JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+
+      JavaRDD<HoodieRecord> records;
+      SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetSchema());
+      SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns() ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+      if (recordType == HoodieRecordType.AVRO) {
+        records = avroRDD.mapPartitions(
+            (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) 
genericRecordIterator -> {
+              if (autoGenerateRecordKeys) {
+                
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
+                
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
+              }
+              BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+              List<HoodieRecord> avroRecords = new ArrayList<>();
+              while (genericRecordIterator.hasNext()) {
+                GenericRecord genRec = genericRecordIterator.next();
+                HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
+                GenericRecord gr = isDropPartitionColumns() ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
+                HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
+                    (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
+                        
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+                        
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+                    : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
+                avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload));
+              }
+              return avroRecords.iterator();
+            });
+      } else if (recordType == HoodieRecordType.SPARK) {
+        // TODO we should remove it if we can read InternalRow from source.
+        records = avroRDD.mapPartitions(itr -> {
+          if (autoGenerateRecordKeys) {
+            props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
+            props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime);
+          }
+          BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
+          StructType baseStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
+          StructType targetStructType = isDropPartitionColumns() ? 
AvroConversionUtils
+              
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
 partitionColumns)) : baseStructType;
+          HoodieAvroDeserializer deserializer = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(),
 baseStructType);
+
+          return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), 
rec -> {
+            InternalRow row = (InternalRow) 
deserializer.deserialize(rec).get();
+            String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
+            String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
+            return new HoodieSparkRecord(new HoodieKey(recordKey, 
partitionPath),
+                
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
           });
-    } else if (recordType == HoodieRecordType.SPARK) {
-      // TODO we should remove it if we can read InternalRow from source.
-      records = avroRDD.mapPartitions(itr -> {
-        if (autoGenerateRecordKeys) {
-          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
-          props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime);
-        }
-        BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-        StructType baseStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
-        StructType targetStructType = isDropPartitionColumns() ? 
AvroConversionUtils
-            
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
 partitionColumns)) : baseStructType;
-        HoodieAvroDeserializer deserializer = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(),
 baseStructType);
-
-        return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec 
-> {
-          InternalRow row = (InternalRow) deserializer.deserialize(rec).get();
-          String recordKey = builtinKeyGenerator.getRecordKey(row, 
baseStructType).toString();
-          String partitionPath = builtinKeyGenerator.getPartitionPath(row, 
baseStructType).toString();
-          return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath),
-              HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, 
targetStructType).apply(row), targetStructType, false);
         });
-      });
-    } else {
-      throw new UnsupportedOperationException(recordType.name());
-    }
+      } else {
+        throw new UnsupportedOperationException(recordType.name());
+      }
 
-    return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
+      return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
+    } finally {
+      hoodieSparkContext.clearJobStatus();

Review Comment:
   This one seems also lazy execution.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -215,6 +219,7 @@ protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
     String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
     List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = 
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, 
hoodieTable);
+    context.clearJobStatus();

Review Comment:
   The code after here will not create new job, it seems OK to clear job status 
here. WDYT?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -758,7 +762,6 @@ protected void reconcileAgainstMarkers(HoodieEngineContext 
context,
         }
 
         // Now delete partially written files
-        context.setJobStatus(this.getClass().getSimpleName(), "Delete all 
partially written files: " + config.getTableName());

Review Comment:
   This status will be overwritten in 
`HoodieTable#deleteInvalidFilesByPartitions`, so just delete it.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -61,6 +61,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
         // perform index loop up to get existing location of records
         context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + 
table.getConfig().getTableName());
         taggedRecords = tag(dedupedRecords, context, table);
+        context.clearJobStatus();

Review Comment:
   Let me check all lazy execution. For this one, "Tagging xxx" status will 
also populate to`deduplicateRecords`, but clear here will not affect other 
jobs, so we retain this line. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java:
##########
@@ -315,10 +315,14 @@ private HoodieData<BootstrapWriteStatus> 
runMetadataBootstrap(List<Pair<String,
         .collect(Collectors.toList());
 
     context.setJobStatus(this.getClass().getSimpleName(), "Run metadata-only 
bootstrap operation: " + config.getTableName());
-    return context.parallelize(
-            bootstrapPaths, Math.min(bootstrapPaths.size(), 
config.getBootstrapParallelism()))
+    try {
+      return context.parallelize(
+          bootstrapPaths, Math.min(bootstrapPaths.size(), 
config.getBootstrapParallelism()))
         .map(partitionFsPair -> getMetadataHandler(config, table, 
partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
-            partitionFsPair.getRight().getLeft(), keyGenerator));
+              partitionFsPair.getRight().getLeft(), keyGenerator));
+    } finally {
+      context.clearJobStatus();

Review Comment:
   Will remove clearJobStatus of lazy execution in CommitActionExecutor, 
because it will clear job status finally:
   
https://github.com/apache/hudi/blob/051eb0e930e983dd4118abec01e10d9b01f91ca0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L499-L505



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -111,44 +111,48 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext 
context,
 
   private HoodieData<HoodieRecord<T>> 
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
     context.setJobStatus(this.getClass().getSimpleName(), "Handling updates 
which are under clustering: " + config.getTableName());
-    Set<HoodieFileGroupId> fileGroupsInPendingClustering =
-        
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
-    // Skip processing if there is no inflight clustering
-    if (fileGroupsInPendingClustering.isEmpty()) {
-      return inputRecords;
-    }
+    try {
+      Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+          
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+      // Skip processing if there is no inflight clustering
+      if (fileGroupsInPendingClustering.isEmpty()) {
+        return inputRecords;
+      }
 
-    UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = 
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
-        .loadClass(config.getClusteringUpdatesStrategyClass(), new Class<?>[] 
{HoodieEngineContext.class, HoodieTable.class, Set.class},
-            this.context, table, fileGroupsInPendingClustering);
-    // For SparkAllowUpdateStrategy with rollback pending clustering as false, 
need not handle
-    // the file group intersection between current ingestion and pending 
clustering file groups.
-    // This will be handled at the conflict resolution strategy.
-    if (updateStrategy instanceof SparkAllowUpdateStrategy && 
!config.isRollbackPendingClustering()) {
-      return inputRecords;
-    }
-    Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
recordsAndPendingClusteringFileGroups =
-        updateStrategy.handleUpdate(inputRecords);
+      UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = 
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
+          .loadClass(config.getClusteringUpdatesStrategyClass(), new 
Class<?>[] {HoodieEngineContext.class, HoodieTable.class, Set.class},
+              this.context, table, fileGroupsInPendingClustering);
+      // For SparkAllowUpdateStrategy with rollback pending clustering as 
false, need not handle
+      // the file group intersection between current ingestion and pending 
clustering file groups.
+      // This will be handled at the conflict resolution strategy.
+      if (updateStrategy instanceof SparkAllowUpdateStrategy && 
!config.isRollbackPendingClustering()) {
+        return inputRecords;
+      }
+      Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
recordsAndPendingClusteringFileGroups =
+          updateStrategy.handleUpdate(inputRecords);
 
-    Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = 
recordsAndPendingClusteringFileGroups.getRight();
-    if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+      Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = 
recordsAndPendingClusteringFileGroups.getRight();
+      if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+        return recordsAndPendingClusteringFileGroups.getLeft();
+      }
+      // there are file groups pending clustering and receiving updates, so 
rollback the pending clustering instants
+      // there could be race condition, for example, if the clustering 
completes after instants are fetched but before rollback completed
+      if (config.isRollbackPendingClustering()) {
+        Set<HoodieInstant> pendingClusteringInstantsToRollback = 
getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
+            .filter(e -> 
fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
+            .map(Map.Entry::getValue)
+            .collect(Collectors.toSet());
+        pendingClusteringInstantsToRollback.forEach(instant -> {
+          String commitTime = table.getMetaClient().createNewInstantTime();
+          table.scheduleRollback(context, commitTime, instant, false, 
config.shouldRollbackUsingMarkers(), false);
+          table.rollback(context, commitTime, instant, true, true);
+        });
+        table.getMetaClient().reloadActiveTimeline();
+      }
       return recordsAndPendingClusteringFileGroups.getLeft();
+    } finally {
+      context.clearJobStatus();

Review Comment:
   ditto



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1731,37 +1731,41 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineC
     }
 
     engineContext.setJobStatus(activeModule, "Record Index: reading record 
keys from " + partitionBaseFilePairs.size() + " base files");
-    final int parallelism = Math.min(partitionBaseFilePairs.size(), 
recordIndexMaxParallelism);
-    return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
-      final String partition = partitionAndBaseFile.getKey();
-      final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
-      final String filename = baseFile.getFileName();
-      Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
-
-      final String fileId = baseFile.getFileId();
-      final String instantTime = baseFile.getCommitTime();
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
 dataFilePath);
-      ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
-
-      return new ClosableIterator<HoodieRecord>() {
-        @Override
-        public void close() {
-          recordKeyIterator.close();
-        }
+    try {
+      final int parallelism = Math.min(partitionBaseFilePairs.size(), 
recordIndexMaxParallelism);
+      return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+        final String partition = partitionAndBaseFile.getKey();
+        final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+        final String filename = baseFile.getFileName();
+        Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
+
+        final String fileId = baseFile.getFileId();
+        final String instantTime = baseFile.getCommitTime();
+        HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
 dataFilePath);
+        ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
 
-        @Override
-        public boolean hasNext() {
-          return recordKeyIterator.hasNext();
-        }
+        return new ClosableIterator<HoodieRecord>() {
+          @Override
+          public void close() {
+            recordKeyIterator.close();
+          }
 
-        @Override
-        public HoodieRecord next() {
-          return forDelete
-              ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
-              : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
-        }
-      };
-    });
+          @Override
+          public boolean hasNext() {
+            return recordKeyIterator.hasNext();
+          }
+
+          @Override
+          public HoodieRecord next() {
+            return forDelete
+                ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
+                : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
+          }
+        };
+      });
+    } finally {
+      engineContext.clearJobStatus();

Review Comment:
   Remove this clearJobStatus.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -165,20 +169,24 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
execute(HoodieData<HoodieRec
     // Handle records update with clustering
     HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = 
clusteringHandleUpdate(inputRecords);
 
-    context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
-    WorkloadProfile workloadProfile =
-        new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), 
operationType, table.getIndex().canIndexLogFiles());
-    LOG.debug("Input workload profile :" + workloadProfile);
-
-    // partition using the insert partitioner
-    final Partitioner partitioner = getPartitioner(workloadProfile);
-    saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
-
-    context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and 
writing data: " + config.getTableName());
-    HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
-    updateIndexAndCommitIfNeeded(writeStatuses, result);
-    return result;
+    try {
+      context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
+      WorkloadProfile workloadProfile =
+          new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), 
operationType, table.getIndex().canIndexLogFiles());
+      LOG.debug("Input workload profile :" + workloadProfile);
+
+      // partition using the insert partitioner
+      final Partitioner partitioner = getPartitioner(workloadProfile);
+      saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Doing partition 
and writing data: " + config.getTableName());
+      HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
+      HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
+      updateIndexAndCommitIfNeeded(writeStatuses, result);
+      return result;
+    } finally {
+      context.clearJobStatus();

Review Comment:
   ditto



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java:
##########
@@ -189,32 +189,36 @@ public JavaRDD<HoodieRecord<HoodieRecordPayload>> 
buildHoodieRecordsForImport(Ja
 
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     context.setJobStatus(this.getClass().getSimpleName(), "Build records for 
import: " + this.tableName);
-    return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
-            job.getConfiguration())
-        // To reduce large number of tasks.
-        .coalesce(16 * this.parallelism).map(entry -> {
-          GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
-          Object partitionField = genericRecord.get(this.partitionKey);
-          if (partitionField == null) {
-            throw new HoodieIOException("partition key is missing. :" + 
this.partitionKey);
-          }
-          Object rowField = genericRecord.get(this.rowKey);
-          if (rowField == null) {
-            throw new HoodieIOException("row field is missing. :" + 
this.rowKey);
-          }
-          String partitionPath = partitionField.toString();
-          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
-          if (partitionField instanceof Number) {
-            try {
-              long ts = (long) (Double.parseDouble(partitionField.toString()) 
* 1000L);
-              partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
-            } catch (NumberFormatException nfe) {
-              LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+    try {
+      return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
+          job.getConfiguration())
+          // To reduce large number of tasks.
+          .coalesce(16 * this.parallelism).map(entry -> {
+            GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
+            Object partitionField = genericRecord.get(this.partitionKey);
+            if (partitionField == null) {
+              throw new HoodieIOException("partition key is missing. :" + 
this.partitionKey);
+            }
+            Object rowField = genericRecord.get(this.rowKey);
+            if (rowField == null) {
+              throw new HoodieIOException("row field is missing. :" + 
this.rowKey);
             }
-          }
-          return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
-              new HoodieJsonPayload(genericRecord.toString()));
-        });
+            String partitionPath = partitionField.toString();
+            LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
+            if (partitionField instanceof Number) {
+              try {
+                long ts = (long) 
(Double.parseDouble(partitionField.toString()) * 1000L);
+                partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+              } catch (NumberFormatException nfe) {
+                LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+              }
+            }
+            return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
+                new HoodieJsonPayload(genericRecord.toString()));
+          });
+    } finally {
+      context.clearJobStatus();

Review Comment:
   ditto



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java:
##########
@@ -203,41 +207,45 @@ private void exportAsHudi(JavaSparkContext jsc, 
FileSystem sourceFs,
     final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     final SerializableConfiguration serConf = context.getHadoopConf();
     context.setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI 
dataset");
-    List<Pair<String, String>> partitionAndFileList = 
context.flatMap(partitions, partition -> {
-      // Only take latest version files <= latestCommit.
-      List<Pair<String, String>> filePaths = fsView
-          .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
-          .map(f -> Pair.of(partition, f.getPath()))
-          .collect(Collectors.toList());
-      // also need to copy over partition metadata
-      FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
-      Path partitionMetaFile = 
HoodiePartitionMetadata.getPartitionMetafilePath(fs,
-          FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
-      if (fs.exists(partitionMetaFile)) {
-        filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
-      }
-      return filePaths.stream();
-    }, parallelism);
-
-    context.foreach(partitionAndFileList, partitionAndFile -> {
-      String partition = partitionAndFile.getLeft();
-      Path sourceFilePath = new Path(partitionAndFile.getRight());
-      Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, 
partition);
-      FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, 
serConf.newCopy());
-      FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, 
serConf.newCopy());
-
-      if (!executorOutputFs.exists(toPartitionPath)) {
-        executorOutputFs.mkdirs(toPartitionPath);
-      }
-      FileUtil.copy(
-          executorSourceFs,
-          sourceFilePath,
-          executorOutputFs,
-          new Path(toPartitionPath, sourceFilePath.getName()),
-          false,
-          false,
-          executorOutputFs.getConf());
-    }, parallelism);
+    try {
+      List<Pair<String, String>> partitionAndFileList = 
context.flatMap(partitions, partition -> {
+        // Only take latest version files <= latestCommit.
+        List<Pair<String, String>> filePaths = fsView
+            .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
+            .map(f -> Pair.of(partition, f.getPath()))
+            .collect(Collectors.toList());
+        // also need to copy over partition metadata
+        FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+        Path partitionMetaFile = 
HoodiePartitionMetadata.getPartitionMetafilePath(fs,
+            FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
+        if (fs.exists(partitionMetaFile)) {
+          filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
+        }
+        return filePaths.stream();
+      }, parallelism);
+
+      context.foreach(partitionAndFileList, partitionAndFile -> {
+        String partition = partitionAndFile.getLeft();
+        Path sourceFilePath = new Path(partitionAndFile.getRight());
+        Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, 
partition);
+        FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, 
serConf.newCopy());
+        FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, 
serConf.newCopy());
+
+        if (!executorOutputFs.exists(toPartitionPath)) {
+          executorOutputFs.mkdirs(toPartitionPath);
+        }
+        FileUtil.copy(
+            executorSourceFs,
+            sourceFilePath,
+            executorOutputFs,
+            new Path(toPartitionPath, sourceFilePath.getName()),
+            false,
+            false,
+            executorOutputFs.getConf());
+      }, parallelism);
+    } finally {
+      context.clearJobStatus();

Review Comment:
   Addressed.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1823,37 +1856,12 @@ public boolean hasNext() {
           public HoodieRecord next() {
             return forDelete
                 ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
-                : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileSlice.getFileId(), fileSlice.getBaseInstantTime(), 0);
+                : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
           }
         };
-      }
-      final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      final String filename = baseFile.getFileName();
-      Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
-
-      final String fileId = baseFile.getFileId();
-      final String instantTime = baseFile.getCommitTime();
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
 dataFilePath);
-      ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
-
-      return new ClosableIterator<HoodieRecord>() {
-        @Override
-        public void close() {
-          recordKeyIterator.close();
-        }
-
-        @Override
-        public boolean hasNext() {
-          return recordKeyIterator.hasNext();
-        }
-
-        @Override
-        public HoodieRecord next() {
-          return forDelete
-              ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
-              : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
-        }
-      };
-    });
+      });
+    } finally {
+      engineContext.clearJobStatus();

Review Comment:
   ditto



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java:
##########
@@ -175,32 +175,36 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>> 
buildHoodieRecordsForImport
 
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     context.setJobStatus(this.getClass().getSimpleName(), "Build records for 
import: " + cfg.tableName);
-    return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
-            job.getConfiguration())
-        // To reduce large number of tasks.
-        .coalesce(16 * cfg.parallelism).map(entry -> {
-          GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
-          Object partitionField = genericRecord.get(cfg.partitionKey);
-          if (partitionField == null) {
-            throw new HoodieIOException("partition key is missing. :" + 
cfg.partitionKey);
-          }
-          Object rowField = genericRecord.get(cfg.rowKey);
-          if (rowField == null) {
-            throw new HoodieIOException("row field is missing. :" + 
cfg.rowKey);
-          }
-          String partitionPath = partitionField.toString();
-          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
-          if (partitionField instanceof Number) {
-            try {
-              long ts = (long) (Double.parseDouble(partitionField.toString()) 
* 1000L);
-              partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
-            } catch (NumberFormatException nfe) {
-              LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+    try {
+      return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
+          job.getConfiguration())
+          // To reduce large number of tasks.
+          .coalesce(16 * cfg.parallelism).map(entry -> {
+            GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
+            Object partitionField = genericRecord.get(cfg.partitionKey);
+            if (partitionField == null) {
+              throw new HoodieIOException("partition key is missing. :" + 
cfg.partitionKey);
+            }
+            Object rowField = genericRecord.get(cfg.rowKey);
+            if (rowField == null) {
+              throw new HoodieIOException("row field is missing. :" + 
cfg.rowKey);
+            }
+            String partitionPath = partitionField.toString();
+            LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
+            if (partitionField instanceof Number) {
+              try {
+                long ts = (long) 
(Double.parseDouble(partitionField.toString()) * 1000L);
+                partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+              } catch (NumberFormatException nfe) {
+                LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+              }
             }
-          }
-          return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
-              new HoodieJsonPayload(genericRecord.toString()));
-        });
+            return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
+                new HoodieJsonPayload(genericRecord.toString()));
+          });
+    } finally {
+      context.clearJobStatus();

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to