This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fe2aae4cea9617cbb469f7b4c6aa072d4c2f3f55 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Fri Dec 3 11:38:29 2021 +0800 [minor] Refactor write profile to always generate fs view (#4198) (cherry picked from commit f74b3d12aa89a96daafe43c69d37d816b9f4018b) --- .../partitioner/profile/DeltaWriteProfile.java | 4 +-- .../sink/partitioner/profile/WriteProfile.java | 29 +++++++++++++++------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 1f08c5a..e1c890c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -50,7 +50,7 @@ public class DeltaWriteProfile extends WriteProfile { List<SmallFile> smallFileLocations = new ArrayList<>(); // Init here since this class (and member variables) might not have been initialized - HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); + HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); // Find out all eligible small file slices if (!commitTimeline.empty()) { @@ -90,7 +90,7 @@ public class DeltaWriteProfile extends WriteProfile { } protected SyncableFileSystemView getFileSystemView() { - return (SyncableFileSystemView) this.table.getSliceView(); + return (SyncableFileSystemView) getTable().getSliceView(); } private long getTotalFileSize(FileSlice fileSlice) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 98eb29c..d13162f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -32,6 +32,7 @@ import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; @@ -67,9 +68,9 @@ public class WriteProfile { private final Path basePath; /** - * The hoodie table. + * The meta client. */ - protected final HoodieTable<?, ?, ?, ?> table; + protected final HoodieTableMetaClient metaClient; /** * The average record size. @@ -97,12 +98,18 @@ public class WriteProfile { */ private final Map<String, HoodieCommitMetadata> metadataCache; + /** + * The engine context. + */ + private final HoodieFlinkEngineContext context; + public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { this.config = config; + this.context = context; this.basePath = new Path(config.getBasePath()); this.smallFilesMap = new HashMap<>(); this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); - this.table = HoodieFlinkTable.create(config, context); + this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get()); this.metadataCache = new HashMap<>(); // profile the record statistics on construction recordProfile(); @@ -117,7 +124,11 @@ public class WriteProfile { } public HoodieTableMetaClient getMetaClient() { - return this.table.getMetaClient(); + return this.metaClient; + } + + protected HoodieTable<?, ?, ?, ?> getTable() { + return HoodieFlinkTable.create(config, context); } /** @@ -127,7 +138,7 @@ public class WriteProfile { private long averageBytesPerRecord() { long avgSize = config.getCopyOnWriteRecordSizeEstimate(); long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit()); - HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); if (!commitTimeline.empty()) { // Go over the reverse ordered commits to get a more recent estimate of average record size. Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator(); @@ -175,7 +186,7 @@ public class WriteProfile { // smallFiles only for partitionPath List<SmallFile> smallFileLocations = new ArrayList<>(); - HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); @@ -198,7 +209,7 @@ public class WriteProfile { } protected SyncableFileSystemView getFileSystemView() { - return (SyncableFileSystemView) HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView(); + return (SyncableFileSystemView) getTable().getBaseFileOnlyView(); } /** @@ -231,9 +242,9 @@ public class WriteProfile { // already reloaded return; } - this.table.getMetaClient().reloadActiveTimeline(); + this.metaClient.reloadActiveTimeline(); recordProfile(); - cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants()); + cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()); this.smallFilesMap.clear(); this.reloadedCheckpointId = checkpointId; }