This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 87a187a91d4 [HUDI-6621] Fix downgrade handler for 0.14.0 (#9467)
87a187a91d4 is described below

commit 87a187a91d472f16d569b6388098d27a3aede760
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Aug 23 18:39:08 2023 +0530

    [HUDI-6621] Fix downgrade handler for 0.14.0 (#9467)
    
    - Since the log block version (due to delete block change) has been 
upgraded in 0.14.0, the delete blocks can not be read in 0.13.0 or earlier.
    - Similarly the addition of record level index field in metadata table 
leads to column drop error on downgrade. The Jira aims to fix the downgrade 
handler to trigger compaction and delete metadata table if user wishes to 
downgrade from version six (0.14.0) to version 5 (0.13.0).
---
 .../table/upgrade/SixToFiveDowngradeHandler.java   |  53 ++++++--
 .../table/upgrade/SupportsUpgradeDowngrade.java    |   3 +
 .../table/upgrade/FlinkUpgradeDowngradeHelper.java |   7 +
 .../table/upgrade/JavaUpgradeDowngradeHelper.java  |   7 +
 .../table/upgrade/SparkUpgradeDowngradeHelper.java |   7 +
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |  10 +-
 .../functional/TestSixToFiveDowngradeHandler.scala | 142 +++++++++++++++++++++
 7 files changed, 211 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
index 228c0f710a8..4793f368f81 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
@@ -18,19 +18,26 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
+import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
 import org.apache.hadoop.fs.Path;
 
@@ -39,12 +46,15 @@ import java.util.Map;
 
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
-import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTablePartition;
 
 /**
  * Downgrade handle to assist in downgrading hoodie table from version 6 to 5.
  * To ensure compatibility, we need recreate the compaction requested file to
  * .aux folder.
+ * Since version 6 includes a new schema field for metadata table(MDT),
+ * the MDT needs to be deleted during downgrade to avoid column drop error.
+ * Also log block version was upgraded in version 6, therefore full compaction 
needs
+ * to be completed during downgrade to avoid both read and future compaction 
failures.
  */
 public class SixToFiveDowngradeHandler implements DowngradeHandler {
 
@@ -52,11 +62,16 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
   public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
     final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
 
-    removeRecordIndexIfNeeded(table, context);
+    // Since version 6 includes a new schema field for metadata table(MDT), 
the MDT needs to be deleted during downgrade to avoid column drop error.
+    HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
+    // The log block version has been upgraded in version six so compaction is 
required for downgrade.
+    runCompaction(table, context, config, upgradeDowngradeHelper);
+
     syncCompactionRequestedFileToAuxiliaryFolder(table);
 
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.reload(table.getMetaClient());
     Map<ConfigProperty, String> updatedTableProps = new HashMap<>();
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
     Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS))
         .ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS, v));
     
Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS_INFLIGHT))
@@ -65,13 +80,29 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
   }
 
   /**
-   * Record-level index, a new partition in metadata table, was first added in
-   * 0.14.0 ({@link HoodieTableVersion#SIX}. Any downgrade from this version
-   * should remove this partition.
+   * Utility method to run compaction for MOR table as part of downgrade step.
    */
-  private static void removeRecordIndexIfNeeded(HoodieTable table, 
HoodieEngineContext context) {
-    HoodieTableMetaClient metaClient = table.getMetaClient();
-    deleteMetadataTablePartition(metaClient, context, 
MetadataPartitionType.RECORD_INDEX, false);
+  private void runCompaction(HoodieTable table, HoodieEngineContext context, 
HoodieWriteConfig config,
+                             SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    try {
+      if (table.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
+        // set required configs for scheduling compaction.
+        
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
+        HoodieWriteConfig compactionConfig = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+        compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), 
"true");
+        
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
 "1");
+        
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
 CompactionTriggerStrategy.NUM_COMMITS.name());
+        
compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), 
UnBoundedCompactionStrategy.class.getName());
+        compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
+        BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(compactionConfig, context);
+        Option<String> compactionInstantOpt = 
writeClient.scheduleCompaction(Option.empty());
+        if (compactionInstantOpt.isPresent()) {
+          writeClient.compact(compactionInstantOpt.get());
+        }
+      }
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SupportsUpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SupportsUpgradeDowngrade.java
index a30396b63ea..dc445be4249 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SupportsUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SupportsUpgradeDowngrade.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
@@ -41,4 +42,6 @@ public interface SupportsUpgradeDowngrade extends 
Serializable {
    * @return partition columns in String.
    */
   String getPartitionColumns(HoodieWriteConfig config);
+
+  BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, 
HoodieEngineContext context);
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
index 69acce56275..a5785742495 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -50,4 +52,9 @@ public class FlinkUpgradeDowngradeHelper implements 
SupportsUpgradeDowngrade {
   public String getPartitionColumns(HoodieWriteConfig config) {
     return 
config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
   }
+
+  @Override
+  public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, 
HoodieEngineContext context) {
+    return new HoodieFlinkWriteClient(context, config);
+  }
 }
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
index e1c44d09133..84872c1ac6e 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.HoodieJavaWriteClient;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -48,4 +50,9 @@ public class JavaUpgradeDowngradeHelper implements 
SupportsUpgradeDowngrade {
   public String getPartitionColumns(HoodieWriteConfig config) {
     return 
config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
   }
+
+  @Override
+  public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, 
HoodieEngineContext context) {
+    return new HoodieJavaWriteClient(context, config);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java
index ba7f9012701..2ce98724f97 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/upgrade/SparkUpgradeDowngradeHelper.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.table.upgrade;
 
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -49,4 +51,9 @@ public class SparkUpgradeDowngradeHelper implements 
SupportsUpgradeDowngrade {
   public String getPartitionColumns(HoodieWriteConfig config) {
     return SparkKeyGenUtils.getPartitionColumns(config.getProps());
   }
+
+  @Override
+  public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, 
HoodieEngineContext context) {
+    return new SparkRDDWriteClient(context, config);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index d76db5d5966..10bd153c90f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -75,6 +75,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -553,11 +554,6 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
         PARTITION_NAME_BLOOM_FILTERS,
         PARTITION_NAME_RECORD_INDEX
     );
-    Set<String> allPartitionsExceptRecordIndex = 
CollectionUtils.createImmutableSet(
-        PARTITION_NAME_FILES,
-        PARTITION_NAME_COLUMN_STATS,
-        PARTITION_NAME_BLOOM_FILTERS
-    );
     assertTrue(Files.exists(recordIndexPartitionPath), "record index partition 
should exist.");
     assertEquals(allPartitions, 
metaClient.getTableConfig().getMetadataPartitions(),
         TABLE_METADATA_PARTITIONS.key() + " should contain all partitions.");
@@ -571,9 +567,9 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     // validate the relevant table states after downgrade
     assertFalse(Files.exists(recordIndexPartitionPath), "record index 
partition should be deleted.");
-    assertEquals(allPartitionsExceptRecordIndex, 
metaClient.getTableConfig().getMetadataPartitions(),
+    assertEquals(Collections.emptySet(), 
metaClient.getTableConfig().getMetadataPartitions(),
         TABLE_METADATA_PARTITIONS.key() + " should contain all partitions 
except record_index.");
-    assertEquals(allPartitionsExceptRecordIndex, 
metaClient.getTableConfig().getMetadataPartitionsInflight(),
+    assertEquals(Collections.emptySet(), 
metaClient.getTableConfig().getMetadataPartitionsInflight(),
         TABLE_METADATA_PARTITIONS_INFLIGHT.key() + " should contain all 
partitions except record_index.");
 
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSixToFiveDowngradeHandler.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSixToFiveDowngradeHandler.scala
new file mode 100644
index 00000000000..dafe0eb7ac2
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSixToFiveDowngradeHandler.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.config.HoodieCompactionConfig
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.jdk.CollectionConverters.{asScalaIteratorConverter, 
collectionAsScalaIterableConverter}
+
+class TestSixToFiveDowngradeHandler extends RecordLevelIndexTestBase {
+
+  private var partitionPaths: java.util.List[Path] = null
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testDowngradeWithMDTAndLogFiles(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0")
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      validate = false)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertTrue(metaClient.getTableConfig.isMetadataTableAvailable)
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      assertTrue(getLogFilesCount(hudiOpts) > 0)
+    }
+
+    new UpgradeDowngrade(metaClient, getWriteConfig(hudiOpts), context, 
SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.FIVE, null)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    // Ensure file slices have been compacted and the MDT table has been 
deleted
+    assertFalse(metaClient.getTableConfig.isMetadataTableAvailable)
+    assertEquals(HoodieTableVersion.FIVE, 
metaClient.getTableConfig.getTableVersion)
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      assertEquals(0, getLogFilesCount(hudiOpts))
+    }
+  }
+
+  @Test
+  def testDowngradeWithoutLogFiles(): Unit = {
+    val hudiOpts = commonOpts + (
+      DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.MERGE_ON_READ.name(),
+      HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0")
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(0, getLogFilesCount(hudiOpts))
+
+    new UpgradeDowngrade(metaClient, getWriteConfig(hudiOpts), context, 
SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.FIVE, null)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertEquals(0, getLogFilesCount(hudiOpts))
+    assertEquals(HoodieTableVersion.FIVE, 
metaClient.getTableConfig.getTableVersion)
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testDowngradeWithoutMDT(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      HoodieMetadataConfig.ENABLE.key() -> "false")
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertFalse(metaClient.getTableConfig.isMetadataTableAvailable)
+
+    new UpgradeDowngrade(metaClient, getWriteConfig(hudiOpts), context, 
SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.FIVE, null)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    assertFalse(metaClient.getTableConfig.isMetadataTableAvailable)
+    assertEquals(HoodieTableVersion.FIVE, 
metaClient.getTableConfig.getTableVersion)
+  }
+
+  private def getLogFilesCount(opts: Map[String, String]) = {
+    var numFileSlicesWithLogFiles = 0L
+    val fsView = getTableFileSystemView(opts)
+    getAllPartititonPaths(fsView).asScala.flatMap { partitionPath =>
+      val relativePath = 
FSUtils.getRelativePartitionPath(metaClient.getBasePathV2, partitionPath)
+      fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, 
getLatestMetaClient(false)
+        
.getActiveTimeline.lastInstant().get().getTimestamp).iterator().asScala.toSeq
+    }.foreach(
+      slice => if (slice.getLogFiles.count() > 0) {
+        numFileSlicesWithLogFiles += 1
+      })
+    numFileSlicesWithLogFiles
+  }
+
+  private def getTableFileSystemView(opts: Map[String, String]): 
HoodieTableFileSystemView = {
+    if (metaClient.getTableConfig.isMetadataTableAvailable) {
+      new HoodieMetadataFileSystemView(metaClient, 
metaClient.getActiveTimeline, 
metadataWriter(getWriteConfig(opts)).getTableMetadata)
+    } else {
+      new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline)
+    }
+  }
+
+  private def getAllPartititonPaths(fsView: HoodieTableFileSystemView): 
java.util.List[Path] = {
+    if (partitionPaths == null) {
+      fsView.loadAllPartitions()
+      partitionPaths = fsView.getPartitionPaths
+    }
+    partitionPaths
+  }
+}

Reply via email to