This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 597a9367ec12cfe18c8878a548f07db514ce7e7c
Author: Mingming Ge <7mmi...@gmail.com>
AuthorDate: Mon Feb 20 14:42:54 2023 +0800

    KYLIN-5530 remove repartition write
    KYLIN-5530 Optimized snapshot builds
    KYLIN-5530 Flat Table Repartition before writing data source 
tables/directories
---
 pom.xml                                            |  2 +-
 .../java/org/apache/kylin/common/KapConfig.java    |  4 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  8 ++
 .../engine/spark/builder/SegmentFlatTable.scala    |  1 +
 .../engine/spark/builder/SnapshotBuilder.scala     | 11 ++-
 .../kylin/engine/spark/job/SegmentExec.scala       |  5 +-
 .../job/stage/build/FlatTableAndDictBase.scala     | 28 +++++--
 .../kylin/engine/spark/utils/Repartitioner.java    | 15 ++--
 .../datasource/storage/LayoutFormatWriter.scala    | 98 ++++++++++++++++++++++
 .../sql/datasource/storage/StorageStore.scala      | 51 +++--------
 10 files changed, 160 insertions(+), 63 deletions(-)

diff --git a/pom.xml b/pom.xml
index 944f95c459..53a49fb81c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@
 
         <!-- Spark versions -->
         <delta.version>1.2.1</delta.version>
-        <spark.version>3.2.0-kylin-4.6.5.0</spark.version>
+        <spark.version>3.2.0-kylin-4.6.6.0-SNAPSHOT</spark.version>
 
         <roaring.version>0.9.2-kylin-r4</roaring.version>
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
index 4f2132760e..d073f089a7 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
@@ -682,6 +682,10 @@ public class KapConfig {
         return 
Boolean.parseBoolean(config.getOptional("kylin.build.optimize-shard-enabled", 
TRUE));
     }
 
+    public boolean isAggIndexAdaptiveBuildEnabled() {
+        return 
Boolean.parseBoolean(config.getOptional("kylin.engine.aggIndex-adaptive-build-enabled",
 FALSE));
+    }
+
     public String getSwitchBackupFsExceptionAllowString() {
         return 
config.getOptional("kylin.query.switch-backup-fs-exception-allow-string", 
"alluxio");
     }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0490ce7b84..853c5d6ce6 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2637,10 +2637,18 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.engine.persist-flattable-enabled", 
TRUE));
     }
 
+    public boolean isFlatTableRedistributionEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.engine.redistribution-flattable-enabled",
 FALSE));
+    }
+
     public boolean isPersistFlatViewEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.engine.persist-flatview", FALSE));
     }
 
+    public boolean isPersistFlatUseSnapshotEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.engine.persist-flat-use-snapshot-enabled",
 TRUE));
+    }
+
     public boolean isBuildExcludedTableEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.engine.build-excluded-table", FALSE));
     }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
index 7449aebdcd..e6e41fd9b9 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.model._
 import org.apache.kylin.query.util.PushDownUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, expr}
+import org.apache.spark.sql.manager.SparderLookupManager
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.utils.ProxyThreadUtils
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
index 777c16d464..f20c3b855e 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
@@ -431,9 +431,11 @@ class SnapshotBuilder(var jobId: String) extends Logging 
with Serializable {
     if (repartitionNum == 0) {
       sourceData.write.parquet(resourcePath)
     } else {
-      sourceData.repartition(repartitionNum).write.parquet(resourcePath)
+      sourceData.repartition().write.parquet(resourcePath)
     }
-    val (originSize, totalRows) = computeSnapshotSize(sourceData)
+
+    val snapshotDS = ss.read.parquet(resourcePath)
+    val (originSize, totalRows) = computeSnapshotSize(snapshotDS)
     resultMap.put(tableDesc.getIdentity, Result(snapshotTablePath, originSize, 
totalRows))
   }
 
@@ -458,10 +460,11 @@ class SnapshotBuilder(var jobId: String) extends Logging 
with Serializable {
         List((totalSize, totalRows)).toIterator
     }(Encoders.tuple(Encoders.scalaLong, Encoders.scalaLong))
 
-    if (ds.isEmpty) {
+    val stats = ds.collect().reduceOption((a, b) => (a._1 + b._1, a._2 + b._2))
+    if (stats.isEmpty) {
       (0L, 0L)
     } else {
-      ds.reduce((a, b) => (a._1 + b._1, a._2 + b._2))
+      stats.get
     }
   }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
index 61675ac4a8..0b104eb42a 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
@@ -270,7 +270,7 @@ trait SegmentExec extends Logging {
     require(layout.getIndex.getMeasures.isEmpty)
     val dimensions = wrapDimensions(layout)
     val columns = NSparkCubingUtil.getColumns(dimensions)
-    parentDS.select(columns: _*).sortWithinPartitions(columns: _*)
+    parentDS.select(columns: _*)
   }
 
   protected def columnIdFunc(colRef: TblColRef): String
@@ -278,11 +278,10 @@ trait SegmentExec extends Logging {
   private def wrapAggLayoutDS(layout: LayoutEntity, parentDS: Dataset[Row]): 
Dataset[Row] = {
     val dimensions = wrapDimensions(layout)
     val measures = layout.getOrderedMeasures.keySet()
-    val sortColumns = NSparkCubingUtil.getColumns(dimensions)
     val selectColumns = 
NSparkCubingUtil.getColumns(NSparkCubingUtil.combineIndices(dimensions, 
measures))
     val aggregated = CuboidAggregator.aggregate(parentDS, //
       dimensions, layout.getIndex.getEffectiveMeasures, columnIdFunc)
-    aggregated.select(selectColumns: _*).sortWithinPartitions(sortColumns: _*)
+    aggregated.select(selectColumns: _*)
   }
 
   protected final def newDataLayout(segment: NDataSegment, //
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index 1409ff8228..a8b28d283b 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -18,9 +18,7 @@
 
 package org.apache.kylin.engine.spark.job.stage.build
 
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
-
+import com.google.common.collect.Sets
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.util.HadoopUtil
@@ -54,6 +52,8 @@ import java.util.{Locale, Objects, Timer, TimerTask}
 import org.apache.kylin.common.constant.LogConstant
 import org.apache.kylin.common.logging.SetLogCategory
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Locale, Objects, Timer, TimerTask}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.parallel.ForkJoinTaskSupport
@@ -61,8 +61,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS}
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success, Try}
 
-import com.google.common.collect.Sets
-
 abstract class FlatTableAndDictBase(private val jobContext: SegmentJob,
                                     private val dataSegment: NDataSegment,
                                     private val buildParam: BuildParam)
@@ -284,7 +282,7 @@ abstract class FlatTableAndDictBase(private val jobContext: 
SegmentJob,
         }
         if (joinDesc.isFlattenable && 
!antiFlattenTableSet.contains(joinDesc.getTable)) {
           val tableRef = joinDesc.getTableRef
-          val tableDS = newTableDS(tableRef)
+          val tableDS = newSnapshotDS(tableRef)
           ret.put(joinDesc, fulfillDS(tableDS, Set.empty, tableRef))
         }
       }
@@ -353,7 +351,11 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     }
     logInfo(s"Segment $segmentId persist flat table: $flatTablePath")
     sparkSession.sparkContext.setJobDescription(s"Segment $segmentId persist 
flat table.")
+    if (config.isFlatTableRedistributionEnabled) {
+      
sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource",
 "true")
+    }
     tableDS.write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString)
+    
sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource",
 null)
     DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => {
       copied.setFlatTableReady(true)
       if (dataSegment.isFlatTableReady) {
@@ -420,6 +422,20 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     Some(tableDS)
   }
 
+  def newSnapshotDS(tableRef: TableRef): Dataset[Row] = {
+    val snapshotResPath = tableRef.getTableDesc.getLastSnapshotPath
+    val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory
+    val snapshotResFilePath = new Path(baseDir + snapshotResPath)
+    val fs = HadoopUtil.getWorkingFileSystem
+    if (snapshotResPath == null
+      || !fs.exists(snapshotResFilePath)
+      || config.isPersistFlatUseSnapshotEnabled) {
+      newTableDS(tableRef)
+    } else {
+      
sparkSession.read.parquet(snapshotResFilePath.toString).alias(tableRef.getAlias)
+    }
+  }
+
   protected def newTableDS(tableRef: TableRef): Dataset[Row] = {
     // By design, why not try recovering from table snapshot.
     // If fact table is a view and its snapshot exists, that will benefit.
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
index 95472f5edf..240c55b564 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -18,11 +18,7 @@
 
 package org.apache.kylin.engine.spark.utils;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,7 +31,10 @@ import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
 
 public class Repartitioner {
     protected static final Logger logger = 
LoggerFactory.getLogger(Repartitioner.class);
@@ -51,8 +50,8 @@ public class Repartitioner {
     private boolean optimizeShardEnabled;
 
     public Repartitioner(int shardSize, int fileLengthThreshold, long 
totalRowCount, long rowCountThreshold,
-            ContentSummary contentSummary, List<Integer> shardByColumns, 
List<Integer> sortByColumns,
-            boolean optimizeShardEnabled) {
+                         ContentSummary contentSummary, List<Integer> 
shardByColumns, List<Integer> sortByColumns,
+                         boolean optimizeShardEnabled) {
         this.shardSize = shardSize;
         this.fileLengthThreshold = fileLengthThreshold;
         this.totalRowCount = totalRowCount;
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/LayoutFormatWriter.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/LayoutFormatWriter.scala
new file mode 100644
index 0000000000..3ee230b703
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/LayoutFormatWriter.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.datasource.storage
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.kylin.common.KapConfig
+import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil
+import 
org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure
+import org.apache.kylin.engine.spark.utils.{JobMetrics, Metrics, 
Repartitioner, StorageUtils}
+import org.apache.kylin.metadata.cube.model.LayoutEntity
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.DataFrame
+
+object LayoutFormatWriter extends Logging {
+
+  protected val TEMP_FLAG = "_temp_"
+
+  /** Describes how output files should be placed in the filesystem. */
+  case class OutputSpec(
+      metrics: JobMetrics,
+      rowCount: Long,
+      hadoopConf: Configuration,
+      bucketNum: Int)
+
+  def write(
+      dataFrame: DataFrame,
+      layout: LayoutEntity,
+      outputPath: Path,
+      kapConfig: KapConfig,
+      storageListener: Option[StorageListener]): OutputSpec = {
+    val ss = dataFrame.sparkSession
+    val hadoopConf = ss.sparkContext.hadoopConfiguration
+    val fs = outputPath.getFileSystem(hadoopConf)
+
+    val dims = layout.getOrderedDimensions.keySet()
+    val sortColumns = NSparkCubingUtil.getColumns(dims)
+
+    if (kapConfig.isAggIndexAdaptiveBuildEnabled
+        && unNeedRepartitionByShardCols(layout)) {
+      val df = dataFrame
+        .repartition()
+        .sortWithinPartitions(sortColumns: _*)
+      val metrics = StorageUtils.writeWithMetrics(df, outputPath.toString)
+      val rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT)
+      OutputSpec(metrics, rowCount, hadoopConf, -1)
+    } else {
+      val tempPath = outputPath.toString + TEMP_FLAG + 
System.currentTimeMillis()
+      val df = dataFrame.sortWithinPartitions(sortColumns: _*)
+      val metrics = StorageUtils.writeWithMetrics(df, tempPath)
+      val rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT)
+      storageListener.foreach(_.onPersistBeforeRepartition(df, layout))
+
+      val bucketNum = StorageUtils.calculateBucketNum(tempPath, layout, 
rowCount, kapConfig)
+      val summary = HadoopUtil.getContentSummary(fs, new Path(tempPath))
+      val repartitionThresholdSize = if (findCountDistinctMeasure(layout)) {
+        kapConfig.getParquetStorageCountDistinctShardSizeRowCount
+      } else {
+        kapConfig.getParquetStorageShardSizeRowCount
+      }
+
+      val repartitioner = new Repartitioner(
+        kapConfig.getParquetStorageShardSizeMB,
+        kapConfig.getParquetStorageRepartitionThresholdSize,
+        rowCount,
+        repartitionThresholdSize,
+        summary,
+        layout.getShardByColumns,
+        layout.getOrderedDimensions.keySet().asList(),
+        kapConfig.optimizeShardEnabled()
+      )
+      repartitioner.doRepartition(outputPath.toString, tempPath, bucketNum, ss)
+      
storageListener.foreach(_.onPersistAfterRepartition(ss.read.parquet(outputPath.toString),
 layout))
+      OutputSpec(metrics, rowCount, hadoopConf, bucketNum)
+    }
+  }
+
+  def unNeedRepartitionByShardCols(layout: LayoutEntity): Boolean = {
+    layout.getShardByColumns == null || layout.getShardByColumns.isEmpty
+  }
+
+}
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
index 9f59d2f9dc..4d05368d6e 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala
@@ -18,14 +18,14 @@
 
 package org.apache.spark.sql.datasource.storage
 
-import org.apache.kylin.engine.spark.job.NSparkCubingUtil
-import 
org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure
-import org.apache.kylin.engine.spark.utils.{JobMetrics, Metrics, 
Repartitioner, StorageUtils}
-import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, 
NDataflow}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KapConfig
 import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.engine.spark.job.NSparkCubingUtil
+import 
org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure
+import org.apache.kylin.engine.spark.utils.{Metrics, Repartitioner, 
StorageUtils}
+import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, 
NDataflow}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.LayoutEntityConverter._
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -92,43 +92,12 @@ abstract class StorageStore extends Logging {
 
 class StorageStoreV1 extends StorageStore {
   override def save(layout: LayoutEntity, outputPath: Path, kapConfig: 
KapConfig, dataFrame: DataFrame): WriteTaskStats = {
-    val (metrics: JobMetrics, rowCount: Long, hadoopConf: Configuration, 
bucketNum: Int) =
-      repartitionWriter(layout, outputPath, kapConfig, dataFrame)
-    val (fileCount, byteSize) = collectFileCountAndSizeAfterSave(outputPath, 
hadoopConf)
+    val outputSpec =
+      LayoutFormatWriter.write(dataFrame, layout, outputPath, kapConfig, 
storageListener)
+    val (fileCount, byteSize) = collectFileCountAndSizeAfterSave(outputPath, 
outputSpec.hadoopConf)
     checkAndWriterFastBitmapLayout(dataFrame, layout, kapConfig, outputPath)
-    WriteTaskStats(0, fileCount, byteSize, rowCount, 
metrics.getMetrics(Metrics.SOURCE_ROWS_CNT), bucketNum, new 
util.ArrayList[String]())
-  }
-
-  private def repartitionWriter(layout: LayoutEntity, outputPath: Path, 
kapConfig: KapConfig, dataFrame: DataFrame) = {
-    val ss = dataFrame.sparkSession
-    val hadoopConf = ss.sparkContext.hadoopConfiguration
-    val fs = outputPath.getFileSystem(hadoopConf)
-
-    val tempPath = outputPath.toString + TEMP_FLAG + System.currentTimeMillis()
-    val metrics = StorageUtils.writeWithMetrics(dataFrame, tempPath)
-    val rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT)
-    storageListener.foreach(_.onPersistBeforeRepartition(dataFrame, layout))
-
-    val bucketNum = StorageUtils.calculateBucketNum(tempPath, layout, 
rowCount, kapConfig)
-    val summary = HadoopUtil.getContentSummary(fs, new Path(tempPath))
-    val repartitionThresholdSize = if (findCountDistinctMeasure(layout)) {
-      kapConfig.getParquetStorageCountDistinctShardSizeRowCount
-    } else {
-      kapConfig.getParquetStorageShardSizeRowCount
-    }
-    val repartitioner = new Repartitioner(
-      kapConfig.getParquetStorageShardSizeMB,
-      kapConfig.getParquetStorageRepartitionThresholdSize,
-      rowCount,
-      repartitionThresholdSize,
-      summary,
-      layout.getShardByColumns,
-      layout.getOrderedDimensions.keySet().asList(),
-      kapConfig.optimizeShardEnabled()
-    )
-    repartitioner.doRepartition(outputPath.toString, tempPath, bucketNum, ss)
-    
storageListener.foreach(_.onPersistAfterRepartition(ss.read.parquet(outputPath.toString),
 layout))
-    (metrics, rowCount, hadoopConf, bucketNum)
+    WriteTaskStats(0, fileCount, byteSize, outputSpec.rowCount,
+      outputSpec.metrics.getMetrics(Metrics.SOURCE_ROWS_CNT), 
outputSpec.bucketNum, new util.ArrayList[String]())
   }
 
   def checkAndWriterFastBitmapLayout(dataset: DataFrame, layoutEntity: 
LayoutEntity, kapConfig: KapConfig, layoutPath: Path): Unit = {
@@ -153,7 +122,7 @@ class StorageStoreV1 extends StorageStore {
     }
 
     val afterReplaced = replaceCountDistinctEvalColumn(bitmaps, dataset)
-    repartitionWriter(layoutEntity, outputPath, kapConfig, afterReplaced)
+    LayoutFormatWriter.write(afterReplaced, layoutEntity, outputPath, 
kapConfig, storageListener)
   }
 
 

Reply via email to