This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fe2aae4cea9617cbb469f7b4c6aa072d4c2f3f55
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Fri Dec 3 11:38:29 2021 +0800

    [minor] Refactor write profile to always generate fs view (#4198)
    
    (cherry picked from commit f74b3d12aa89a96daafe43c69d37d816b9f4018b)
---
 .../partitioner/profile/DeltaWriteProfile.java     |  4 +--
 .../sink/partitioner/profile/WriteProfile.java     | 29 +++++++++++++++-------
 2 files changed, 22 insertions(+), 11 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index 1f08c5a..e1c890c 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -50,7 +50,7 @@ public class DeltaWriteProfile extends WriteProfile {
     List<SmallFile> smallFileLocations = new ArrayList<>();
 
     // Init here since this class (and member variables) might not have been 
initialized
-    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
 
     // Find out all eligible small file slices
     if (!commitTimeline.empty()) {
@@ -90,7 +90,7 @@ public class DeltaWriteProfile extends WriteProfile {
   }
 
   protected SyncableFileSystemView getFileSystemView() {
-    return (SyncableFileSystemView) this.table.getSliceView();
+    return (SyncableFileSystemView) getTable().getSliceView();
   }
 
   private long getTotalFileSize(FileSlice fileSlice) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index 98eb29c..d13162f 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -32,6 +32,7 @@ import org.apache.hudi.sink.partitioner.BucketAssigner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SmallFile;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
@@ -67,9 +68,9 @@ public class WriteProfile {
   private final Path basePath;
 
   /**
-   * The hoodie table.
+   * The meta client.
    */
-  protected final HoodieTable<?, ?, ?, ?> table;
+  protected final HoodieTableMetaClient metaClient;
 
   /**
    * The average record size.
@@ -97,12 +98,18 @@ public class WriteProfile {
    */
   private final Map<String, HoodieCommitMetadata> metadataCache;
 
+  /**
+   * The engine context.
+   */
+  private final HoodieFlinkEngineContext context;
+
   public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext 
context) {
     this.config = config;
+    this.context = context;
     this.basePath = new Path(config.getBasePath());
     this.smallFilesMap = new HashMap<>();
     this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
-    this.table = HoodieFlinkTable.create(config, context);
+    this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), 
context.getHadoopConf().get());
     this.metadataCache = new HashMap<>();
     // profile the record statistics on construction
     recordProfile();
@@ -117,7 +124,11 @@ public class WriteProfile {
   }
 
   public HoodieTableMetaClient getMetaClient() {
-    return this.table.getMetaClient();
+    return this.metaClient;
+  }
+
+  protected HoodieTable<?, ?, ?, ?> getTable() {
+    return HoodieFlinkTable.create(config, context);
   }
 
   /**
@@ -127,7 +138,7 @@ public class WriteProfile {
   private long averageBytesPerRecord() {
     long avgSize = config.getCopyOnWriteRecordSizeEstimate();
     long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() 
* config.getParquetSmallFileLimit());
-    HoodieTimeline commitTimeline = 
table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
     if (!commitTimeline.empty()) {
       // Go over the reverse ordered commits to get a more recent estimate of 
average record size.
       Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
@@ -175,7 +186,7 @@ public class WriteProfile {
     // smallFiles only for partitionPath
     List<SmallFile> smallFileLocations = new ArrayList<>();
 
-    HoodieTimeline commitTimeline = 
table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
 
     if (!commitTimeline.empty()) { // if we have some commits
       HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
@@ -198,7 +209,7 @@ public class WriteProfile {
   }
 
   protected SyncableFileSystemView getFileSystemView() {
-    return (SyncableFileSystemView) HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView();
+    return (SyncableFileSystemView) getTable().getBaseFileOnlyView();
   }
 
   /**
@@ -231,9 +242,9 @@ public class WriteProfile {
       // already reloaded
       return;
     }
-    this.table.getMetaClient().reloadActiveTimeline();
+    this.metaClient.reloadActiveTimeline();
     recordProfile();
-    
cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants());
+    
cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants());
     this.smallFilesMap.clear();
     this.reloadedCheckpointId = checkpointId;
   }

Reply via email to