wecharyu commented on code in PR #9899: URL: https://github.com/apache/hudi/pull/9899#discussion_r1371235058
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java: ########## @@ -605,66 +605,70 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc } hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty"); - if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { - LOG.info("No new data, perform empty commit."); - return Pair.of(schemaProvider, Pair.of(checkpointStr, hoodieSparkContext.emptyRDD())); - } + try { + if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { + LOG.info("No new data, perform empty commit."); + return Pair.of(schemaProvider, Pair.of(checkpointStr, hoodieSparkContext.emptyRDD())); + } - boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); - Set<String> partitionColumns = getPartitionColumns(props); - JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); - - JavaRDD<HoodieRecord> records; - SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema()); - SerializableSchema processedAvroSchema = new SerializableSchema(isDropPartitionColumns() ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get()); - if (recordType == HoodieRecordType.AVRO) { - records = avroRDD.mapPartitions( - (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) genericRecordIterator -> { - if (autoGenerateRecordKeys) { - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); - } - BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - List<HoodieRecord> avroRecords = new ArrayList<>(); - while (genericRecordIterator.hasNext()) { - GenericRecord genRec = genericRecordIterator.next(); - HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); - GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; - HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) - : DataSourceUtils.createPayload(cfg.payloadClassName, gr); - avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload)); - } - return avroRecords.iterator(); + boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); + Set<String> partitionColumns = getPartitionColumns(props); + JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get(); + + JavaRDD<HoodieRecord> records; + SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema()); + SerializableSchema processedAvroSchema = new SerializableSchema(isDropPartitionColumns() ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get()); + if (recordType == HoodieRecordType.AVRO) { + records = avroRDD.mapPartitions( + (FlatMapFunction<Iterator<GenericRecord>, HoodieRecord>) genericRecordIterator -> { + if (autoGenerateRecordKeys) { + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); + } + BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + List<HoodieRecord> avroRecords = new ArrayList<>(); + while (genericRecordIterator.hasNext()) { + GenericRecord genRec = genericRecordIterator.next(); + HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); + GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; + HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, + (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) + : DataSourceUtils.createPayload(cfg.payloadClassName, gr); + avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload)); + } + return avroRecords.iterator(); + }); + } else if (recordType == HoodieRecordType.SPARK) { + // TODO we should remove it if we can read InternalRow from source. + records = avroRDD.mapPartitions(itr -> { + if (autoGenerateRecordKeys) { + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); + } + BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + StructType baseStructType = AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get()); + StructType targetStructType = isDropPartitionColumns() ? AvroConversionUtils + .convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(), partitionColumns)) : baseStructType; + HoodieAvroDeserializer deserializer = SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(), baseStructType); + + return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec -> { + InternalRow row = (InternalRow) deserializer.deserialize(rec).get(); + String recordKey = builtinKeyGenerator.getRecordKey(row, baseStructType).toString(); + String partitionPath = builtinKeyGenerator.getPartitionPath(row, baseStructType).toString(); + return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), + HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false); }); - } else if (recordType == HoodieRecordType.SPARK) { - // TODO we should remove it if we can read InternalRow from source. - records = avroRDD.mapPartitions(itr -> { - if (autoGenerateRecordKeys) { - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); - } - BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - StructType baseStructType = AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get()); - StructType targetStructType = isDropPartitionColumns() ? AvroConversionUtils - .convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(), partitionColumns)) : baseStructType; - HoodieAvroDeserializer deserializer = SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(), baseStructType); - - return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec -> { - InternalRow row = (InternalRow) deserializer.deserialize(rec).get(); - String recordKey = builtinKeyGenerator.getRecordKey(row, baseStructType).toString(); - String partitionPath = builtinKeyGenerator.getPartitionPath(row, baseStructType).toString(); - return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), - HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false); }); - }); - } else { - throw new UnsupportedOperationException(recordType.name()); - } + } else { + throw new UnsupportedOperationException(recordType.name()); + } - return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); + return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); + } finally { + hoodieSparkContext.clearJobStatus(); Review Comment: This one seems also lazy execution. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java: ########## @@ -215,6 +219,7 @@ protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex( String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable); + context.clearJobStatus(); Review Comment: The code after here will not create new job, it seems OK to clear job status here. WDYT? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java: ########## @@ -758,7 +762,6 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context, } // Now delete partially written files - context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName()); Review Comment: This status will be overwritten in `HoodieTable#deleteInvalidFilesByPartitions`, so just delete it. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java: ########## @@ -61,6 +61,7 @@ public HoodieWriteMetadata<O> write(String instantTime, // perform index loop up to get existing location of records context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); taggedRecords = tag(dedupedRecords, context, table); + context.clearJobStatus(); Review Comment: Let me check all lazy execution. For this one, "Tagging xxx" status will also populate to`deduplicateRecords`, but clear here will not affect other jobs, so we retain this line. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java: ########## @@ -315,10 +315,14 @@ private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, .collect(Collectors.toList()); context.setJobStatus(this.getClass().getSimpleName(), "Run metadata-only bootstrap operation: " + config.getTableName()); - return context.parallelize( - bootstrapPaths, Math.min(bootstrapPaths.size(), config.getBootstrapParallelism())) + try { + return context.parallelize( + bootstrapPaths, Math.min(bootstrapPaths.size(), config.getBootstrapParallelism())) .map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(), - partitionFsPair.getRight().getLeft(), keyGenerator)); + partitionFsPair.getRight().getLeft(), keyGenerator)); + } finally { + context.clearJobStatus(); Review Comment: Will remove clearJobStatus of lazy execution in CommitActionExecutor, because it will clear job status finally: https://github.com/apache/hudi/blob/051eb0e930e983dd4118abec01e10d9b01f91ca0/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L499-L505 ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java: ########## @@ -111,44 +111,48 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext context, private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) { context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering: " + config.getTableName()); - Set<HoodieFileGroupId> fileGroupsInPendingClustering = - table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); - // Skip processing if there is no inflight clustering - if (fileGroupsInPendingClustering.isEmpty()) { - return inputRecords; - } + try { + Set<HoodieFileGroupId> fileGroupsInPendingClustering = + table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet()); + // Skip processing if there is no inflight clustering + if (fileGroupsInPendingClustering.isEmpty()) { + return inputRecords; + } - UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils - .loadClass(config.getClusteringUpdatesStrategyClass(), new Class<?>[] {HoodieEngineContext.class, HoodieTable.class, Set.class}, - this.context, table, fileGroupsInPendingClustering); - // For SparkAllowUpdateStrategy with rollback pending clustering as false, need not handle - // the file group intersection between current ingestion and pending clustering file groups. - // This will be handled at the conflict resolution strategy. - if (updateStrategy instanceof SparkAllowUpdateStrategy && !config.isRollbackPendingClustering()) { - return inputRecords; - } - Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups = - updateStrategy.handleUpdate(inputRecords); + UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils + .loadClass(config.getClusteringUpdatesStrategyClass(), new Class<?>[] {HoodieEngineContext.class, HoodieTable.class, Set.class}, + this.context, table, fileGroupsInPendingClustering); + // For SparkAllowUpdateStrategy with rollback pending clustering as false, need not handle + // the file group intersection between current ingestion and pending clustering file groups. + // This will be handled at the conflict resolution strategy. + if (updateStrategy instanceof SparkAllowUpdateStrategy && !config.isRollbackPendingClustering()) { + return inputRecords; + } + Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups = + updateStrategy.handleUpdate(inputRecords); - Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight(); - if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) { + Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight(); + if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) { + return recordsAndPendingClusteringFileGroups.getLeft(); + } + // there are file groups pending clustering and receiving updates, so rollback the pending clustering instants + // there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed + if (config.isRollbackPendingClustering()) { + Set<HoodieInstant> pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream() + .filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toSet()); + pendingClusteringInstantsToRollback.forEach(instant -> { + String commitTime = table.getMetaClient().createNewInstantTime(); + table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers(), false); + table.rollback(context, commitTime, instant, true, true); + }); + table.getMetaClient().reloadActiveTimeline(); + } return recordsAndPendingClusteringFileGroups.getLeft(); + } finally { + context.clearJobStatus(); Review Comment: ditto ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1731,37 +1731,41 @@ public static HoodieData<HoodieRecord> readRecordKeysFromBaseFiles(HoodieEngineC } engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionBaseFilePairs.size() + " base files"); - final int parallelism = Math.min(partitionBaseFilePairs.size(), recordIndexMaxParallelism); - return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(partitionAndBaseFile -> { - final String partition = partitionAndBaseFile.getKey(); - final HoodieBaseFile baseFile = partitionAndBaseFile.getValue(); - final String filename = baseFile.getFileName(); - Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + filename); - - final String fileId = baseFile.getFileId(); - final String instantTime = baseFile.getCommitTime(); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(), dataFilePath); - ClosableIterator<String> recordKeyIterator = reader.getRecordKeyIterator(); - - return new ClosableIterator<HoodieRecord>() { - @Override - public void close() { - recordKeyIterator.close(); - } + try { + final int parallelism = Math.min(partitionBaseFilePairs.size(), recordIndexMaxParallelism); + return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(partitionAndBaseFile -> { + final String partition = partitionAndBaseFile.getKey(); + final HoodieBaseFile baseFile = partitionAndBaseFile.getValue(); + final String filename = baseFile.getFileName(); + Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + filename); + + final String fileId = baseFile.getFileId(); + final String instantTime = baseFile.getCommitTime(); + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(), dataFilePath); + ClosableIterator<String> recordKeyIterator = reader.getRecordKeyIterator(); - @Override - public boolean hasNext() { - return recordKeyIterator.hasNext(); - } + return new ClosableIterator<HoodieRecord>() { + @Override + public void close() { + recordKeyIterator.close(); + } - @Override - public HoodieRecord next() { - return forDelete - ? HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next()) - : HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, instantTime, 0); - } - }; - }); + @Override + public boolean hasNext() { + return recordKeyIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + return forDelete + ? HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next()) + : HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, instantTime, 0); + } + }; + }); + } finally { + engineContext.clearJobStatus(); Review Comment: Remove this clearJobStatus. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java: ########## @@ -165,20 +169,24 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRec // Handle records update with clustering HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords); - context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName()); - WorkloadProfile workloadProfile = - new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles()); - LOG.debug("Input workload profile :" + workloadProfile); - - // partition using the insert partitioner - final Partitioner partitioner = getPartitioner(workloadProfile); - saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime); - - context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName()); - HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); - HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>(); - updateIndexAndCommitIfNeeded(writeStatuses, result); - return result; + try { + context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName()); + WorkloadProfile workloadProfile = + new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles()); + LOG.debug("Input workload profile :" + workloadProfile); + + // partition using the insert partitioner + final Partitioner partitioner = getPartitioner(workloadProfile); + saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime); + + context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data: " + config.getTableName()); + HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); + HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>(); + updateIndexAndCommitIfNeeded(writeStatuses, result); + return result; + } finally { + context.clearJobStatus(); Review Comment: ditto ########## hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java: ########## @@ -189,32 +189,36 @@ public JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(Ja HoodieEngineContext context = new HoodieSparkEngineContext(jsc); context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + this.tableName); - return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, - job.getConfiguration()) - // To reduce large number of tasks. - .coalesce(16 * this.parallelism).map(entry -> { - GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2(); - Object partitionField = genericRecord.get(this.partitionKey); - if (partitionField == null) { - throw new HoodieIOException("partition key is missing. :" + this.partitionKey); - } - Object rowField = genericRecord.get(this.rowKey); - if (rowField == null) { - throw new HoodieIOException("row field is missing. :" + this.rowKey); - } - String partitionPath = partitionField.toString(); - LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); - if (partitionField instanceof Number) { - try { - long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); - partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); - } catch (NumberFormatException nfe) { - LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + try { + return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, + job.getConfiguration()) + // To reduce large number of tasks. + .coalesce(16 * this.parallelism).map(entry -> { + GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2(); + Object partitionField = genericRecord.get(this.partitionKey); + if (partitionField == null) { + throw new HoodieIOException("partition key is missing. :" + this.partitionKey); + } + Object rowField = genericRecord.get(this.rowKey); + if (rowField == null) { + throw new HoodieIOException("row field is missing. :" + this.rowKey); } - } - return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), partitionPath), - new HoodieJsonPayload(genericRecord.toString())); - }); + String partitionPath = partitionField.toString(); + LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); + if (partitionField instanceof Number) { + try { + long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); + partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); + } catch (NumberFormatException nfe) { + LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + } + } + return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), partitionPath), + new HoodieJsonPayload(genericRecord.toString())); + }); + } finally { + context.clearJobStatus(); Review Comment: ditto ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java: ########## @@ -203,41 +207,45 @@ private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs, final HoodieEngineContext context = new HoodieSparkEngineContext(jsc); final SerializableConfiguration serConf = context.getHadoopConf(); context.setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI dataset"); - List<Pair<String, String>> partitionAndFileList = context.flatMap(partitions, partition -> { - // Only take latest version files <= latestCommit. - List<Pair<String, String>> filePaths = fsView - .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp) - .map(f -> Pair.of(partition, f.getPath())) - .collect(Collectors.toList()); - // also need to copy over partition metadata - FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); - Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs, - FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get(); - if (fs.exists(partitionMetaFile)) { - filePaths.add(Pair.of(partition, partitionMetaFile.toString())); - } - return filePaths.stream(); - }, parallelism); - - context.foreach(partitionAndFileList, partitionAndFile -> { - String partition = partitionAndFile.getLeft(); - Path sourceFilePath = new Path(partitionAndFile.getRight()); - Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition); - FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); - FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy()); - - if (!executorOutputFs.exists(toPartitionPath)) { - executorOutputFs.mkdirs(toPartitionPath); - } - FileUtil.copy( - executorSourceFs, - sourceFilePath, - executorOutputFs, - new Path(toPartitionPath, sourceFilePath.getName()), - false, - false, - executorOutputFs.getConf()); - }, parallelism); + try { + List<Pair<String, String>> partitionAndFileList = context.flatMap(partitions, partition -> { + // Only take latest version files <= latestCommit. + List<Pair<String, String>> filePaths = fsView + .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp) + .map(f -> Pair.of(partition, f.getPath())) + .collect(Collectors.toList()); + // also need to copy over partition metadata + FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); + Path partitionMetaFile = HoodiePartitionMetadata.getPartitionMetafilePath(fs, + FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get(); + if (fs.exists(partitionMetaFile)) { + filePaths.add(Pair.of(partition, partitionMetaFile.toString())); + } + return filePaths.stream(); + }, parallelism); + + context.foreach(partitionAndFileList, partitionAndFile -> { + String partition = partitionAndFile.getLeft(); + Path sourceFilePath = new Path(partitionAndFile.getRight()); + Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition); + FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy()); + FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy()); + + if (!executorOutputFs.exists(toPartitionPath)) { + executorOutputFs.mkdirs(toPartitionPath); + } + FileUtil.copy( + executorSourceFs, + sourceFilePath, + executorOutputFs, + new Path(toPartitionPath, sourceFilePath.getName()), + false, + false, + executorOutputFs.getConf()); + }, parallelism); + } finally { + context.clearJobStatus(); Review Comment: Addressed. ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -1823,37 +1856,12 @@ public boolean hasNext() { public HoodieRecord next() { return forDelete ? HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next()) - : HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileSlice.getFileId(), fileSlice.getBaseInstantTime(), 0); + : HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, instantTime, 0); } }; - } - final HoodieBaseFile baseFile = fileSlice.getBaseFile().get(); - final String filename = baseFile.getFileName(); - Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + filename); - - final String fileId = baseFile.getFileId(); - final String instantTime = baseFile.getCommitTime(); - HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(), dataFilePath); - ClosableIterator<String> recordKeyIterator = reader.getRecordKeyIterator(); - - return new ClosableIterator<HoodieRecord>() { - @Override - public void close() { - recordKeyIterator.close(); - } - - @Override - public boolean hasNext() { - return recordKeyIterator.hasNext(); - } - - @Override - public HoodieRecord next() { - return forDelete - ? HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next()) - : HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, instantTime, 0); - } - }; - }); + }); + } finally { + engineContext.clearJobStatus(); Review Comment: ditto ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java: ########## @@ -175,32 +175,36 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport HoodieEngineContext context = new HoodieSparkEngineContext(jsc); context.setJobStatus(this.getClass().getSimpleName(), "Build records for import: " + cfg.tableName); - return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, - job.getConfiguration()) - // To reduce large number of tasks. - .coalesce(16 * cfg.parallelism).map(entry -> { - GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2(); - Object partitionField = genericRecord.get(cfg.partitionKey); - if (partitionField == null) { - throw new HoodieIOException("partition key is missing. :" + cfg.partitionKey); - } - Object rowField = genericRecord.get(cfg.rowKey); - if (rowField == null) { - throw new HoodieIOException("row field is missing. :" + cfg.rowKey); - } - String partitionPath = partitionField.toString(); - LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); - if (partitionField instanceof Number) { - try { - long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); - partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); - } catch (NumberFormatException nfe) { - LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + try { + return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, + job.getConfiguration()) + // To reduce large number of tasks. + .coalesce(16 * cfg.parallelism).map(entry -> { + GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2(); + Object partitionField = genericRecord.get(cfg.partitionKey); + if (partitionField == null) { + throw new HoodieIOException("partition key is missing. :" + cfg.partitionKey); + } + Object rowField = genericRecord.get(cfg.rowKey); + if (rowField == null) { + throw new HoodieIOException("row field is missing. :" + cfg.rowKey); + } + String partitionPath = partitionField.toString(); + LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")"); + if (partitionField instanceof Number) { + try { + long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L); + partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts)); + } catch (NumberFormatException nfe) { + LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")"); + } } - } - return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), partitionPath), - new HoodieJsonPayload(genericRecord.toString())); - }); + return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), partitionPath), + new HoodieJsonPayload(genericRecord.toString())); + }); + } finally { + context.clearJobStatus(); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org