This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 597a9367ec12cfe18c8878a548f07db514ce7e7c Author: Mingming Ge <7mmi...@gmail.com> AuthorDate: Mon Feb 20 14:42:54 2023 +0800 KYLIN-5530 remove repartition write KYLIN-5530 Optimized snapshot builds KYLIN-5530 Flat Table Repartition before writing data source tables/directories --- pom.xml | 2 +- .../java/org/apache/kylin/common/KapConfig.java | 4 + .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../engine/spark/builder/SegmentFlatTable.scala | 1 + .../engine/spark/builder/SnapshotBuilder.scala | 11 ++- .../kylin/engine/spark/job/SegmentExec.scala | 5 +- .../job/stage/build/FlatTableAndDictBase.scala | 28 +++++-- .../kylin/engine/spark/utils/Repartitioner.java | 15 ++-- .../datasource/storage/LayoutFormatWriter.scala | 98 ++++++++++++++++++++++ .../sql/datasource/storage/StorageStore.scala | 51 +++-------- 10 files changed, 160 insertions(+), 63 deletions(-) diff --git a/pom.xml b/pom.xml index 944f95c459..53a49fb81c 100644 --- a/pom.xml +++ b/pom.xml @@ -124,7 +124,7 @@ <!-- Spark versions --> <delta.version>1.2.1</delta.version> - <spark.version>3.2.0-kylin-4.6.5.0</spark.version> + <spark.version>3.2.0-kylin-4.6.6.0-SNAPSHOT</spark.version> <roaring.version>0.9.2-kylin-r4</roaring.version> diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java index 4f2132760e..d073f089a7 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java @@ -682,6 +682,10 @@ public class KapConfig { return Boolean.parseBoolean(config.getOptional("kylin.build.optimize-shard-enabled", TRUE)); } + public boolean isAggIndexAdaptiveBuildEnabled() { + return Boolean.parseBoolean(config.getOptional("kylin.engine.aggIndex-adaptive-build-enabled", FALSE)); + } + public String getSwitchBackupFsExceptionAllowString() { return config.getOptional("kylin.query.switch-backup-fs-exception-allow-string", "alluxio"); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 0490ce7b84..853c5d6ce6 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2637,10 +2637,18 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.engine.persist-flattable-enabled", TRUE)); } + public boolean isFlatTableRedistributionEnabled() { + return Boolean.parseBoolean(getOptional("kylin.engine.redistribution-flattable-enabled", FALSE)); + } + public boolean isPersistFlatViewEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.persist-flatview", FALSE)); } + public boolean isPersistFlatUseSnapshotEnabled() { + return Boolean.parseBoolean(getOptional("kylin.engine.persist-flat-use-snapshot-enabled", TRUE)); + } + public boolean isBuildExcludedTableEnabled() { return Boolean.parseBoolean(getOptional("kylin.engine.build-excluded-table", FALSE)); } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala index 7449aebdcd..e6e41fd9b9 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala @@ -37,6 +37,7 @@ import org.apache.kylin.metadata.model._ import org.apache.kylin.query.util.PushDownUtil import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, expr} +import org.apache.spark.sql.manager.SparderLookupManager import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.utils.ProxyThreadUtils diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala index 777c16d464..f20c3b855e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala @@ -431,9 +431,11 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { if (repartitionNum == 0) { sourceData.write.parquet(resourcePath) } else { - sourceData.repartition(repartitionNum).write.parquet(resourcePath) + sourceData.repartition().write.parquet(resourcePath) } - val (originSize, totalRows) = computeSnapshotSize(sourceData) + + val snapshotDS = ss.read.parquet(resourcePath) + val (originSize, totalRows) = computeSnapshotSize(snapshotDS) resultMap.put(tableDesc.getIdentity, Result(snapshotTablePath, originSize, totalRows)) } @@ -458,10 +460,11 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { List((totalSize, totalRows)).toIterator }(Encoders.tuple(Encoders.scalaLong, Encoders.scalaLong)) - if (ds.isEmpty) { + val stats = ds.collect().reduceOption((a, b) => (a._1 + b._1, a._2 + b._2)) + if (stats.isEmpty) { (0L, 0L) } else { - ds.reduce((a, b) => (a._1 + b._1, a._2 + b._2)) + stats.get } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala index 61675ac4a8..0b104eb42a 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala @@ -270,7 +270,7 @@ trait SegmentExec extends Logging { require(layout.getIndex.getMeasures.isEmpty) val dimensions = wrapDimensions(layout) val columns = NSparkCubingUtil.getColumns(dimensions) - parentDS.select(columns: _*).sortWithinPartitions(columns: _*) + parentDS.select(columns: _*) } protected def columnIdFunc(colRef: TblColRef): String @@ -278,11 +278,10 @@ trait SegmentExec extends Logging { private def wrapAggLayoutDS(layout: LayoutEntity, parentDS: Dataset[Row]): Dataset[Row] = { val dimensions = wrapDimensions(layout) val measures = layout.getOrderedMeasures.keySet() - val sortColumns = NSparkCubingUtil.getColumns(dimensions) val selectColumns = NSparkCubingUtil.getColumns(NSparkCubingUtil.combineIndices(dimensions, measures)) val aggregated = CuboidAggregator.aggregate(parentDS, // dimensions, layout.getIndex.getEffectiveMeasures, columnIdFunc) - aggregated.select(selectColumns: _*).sortWithinPartitions(sortColumns: _*) + aggregated.select(selectColumns: _*) } protected final def newDataLayout(segment: NDataSegment, // diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index 1409ff8228..a8b28d283b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -18,9 +18,7 @@ package org.apache.kylin.engine.spark.job.stage.build -import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Locale, Objects, Timer, TimerTask} - +import com.google.common.collect.Sets import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.kylin.common.util.HadoopUtil @@ -54,6 +52,8 @@ import java.util.{Locale, Objects, Timer, TimerTask} import org.apache.kylin.common.constant.LogConstant import org.apache.kylin.common.logging.SetLogCategory +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.{Locale, Objects, Timer, TimerTask} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.parallel.ForkJoinTaskSupport @@ -61,8 +61,6 @@ import scala.concurrent.duration.{Duration, MILLISECONDS} import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success, Try} -import com.google.common.collect.Sets - abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, private val dataSegment: NDataSegment, private val buildParam: BuildParam) @@ -284,7 +282,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, } if (joinDesc.isFlattenable && !antiFlattenTableSet.contains(joinDesc.getTable)) { val tableRef = joinDesc.getTableRef - val tableDS = newTableDS(tableRef) + val tableDS = newSnapshotDS(tableRef) ret.put(joinDesc, fulfillDS(tableDS, Set.empty, tableRef)) } } @@ -353,7 +351,11 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, } logInfo(s"Segment $segmentId persist flat table: $flatTablePath") sparkSession.sparkContext.setJobDescription(s"Segment $segmentId persist flat table.") + if (config.isFlatTableRedistributionEnabled) { + sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource", "true") + } tableDS.write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString) + sparkSession.sessionState.conf.setLocalProperty("spark.sql.sources.repartitionWritingDataSource", null) DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => { copied.setFlatTableReady(true) if (dataSegment.isFlatTableReady) { @@ -420,6 +422,20 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, Some(tableDS) } + def newSnapshotDS(tableRef: TableRef): Dataset[Row] = { + val snapshotResPath = tableRef.getTableDesc.getLastSnapshotPath + val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory + val snapshotResFilePath = new Path(baseDir + snapshotResPath) + val fs = HadoopUtil.getWorkingFileSystem + if (snapshotResPath == null + || !fs.exists(snapshotResFilePath) + || config.isPersistFlatUseSnapshotEnabled) { + newTableDS(tableRef) + } else { + sparkSession.read.parquet(snapshotResFilePath.toString).alias(tableRef.getAlias) + } + } + protected def newTableDS(tableRef: TableRef): Dataset[Row] = { // By design, why not try recovering from table snapshot. // If fact table is a view and its snapshot exists, that will benefit. diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java index 95472f5edf..240c55b564 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java @@ -18,11 +18,7 @@ package org.apache.kylin.engine.spark.utils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; - +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,7 +31,10 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; public class Repartitioner { protected static final Logger logger = LoggerFactory.getLogger(Repartitioner.class); @@ -51,8 +50,8 @@ public class Repartitioner { private boolean optimizeShardEnabled; public Repartitioner(int shardSize, int fileLengthThreshold, long totalRowCount, long rowCountThreshold, - ContentSummary contentSummary, List<Integer> shardByColumns, List<Integer> sortByColumns, - boolean optimizeShardEnabled) { + ContentSummary contentSummary, List<Integer> shardByColumns, List<Integer> sortByColumns, + boolean optimizeShardEnabled) { this.shardSize = shardSize; this.fileLengthThreshold = fileLengthThreshold; this.totalRowCount = totalRowCount; diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/LayoutFormatWriter.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/LayoutFormatWriter.scala new file mode 100644 index 0000000000..3ee230b703 --- /dev/null +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/LayoutFormatWriter.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.datasource.storage + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.kylin.common.KapConfig +import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.engine.spark.job.NSparkCubingUtil +import org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure +import org.apache.kylin.engine.spark.utils.{JobMetrics, Metrics, Repartitioner, StorageUtils} +import org.apache.kylin.metadata.cube.model.LayoutEntity +import org.apache.spark.internal.Logging +import org.apache.spark.sql.DataFrame + +object LayoutFormatWriter extends Logging { + + protected val TEMP_FLAG = "_temp_" + + /** Describes how output files should be placed in the filesystem. */ + case class OutputSpec( + metrics: JobMetrics, + rowCount: Long, + hadoopConf: Configuration, + bucketNum: Int) + + def write( + dataFrame: DataFrame, + layout: LayoutEntity, + outputPath: Path, + kapConfig: KapConfig, + storageListener: Option[StorageListener]): OutputSpec = { + val ss = dataFrame.sparkSession + val hadoopConf = ss.sparkContext.hadoopConfiguration + val fs = outputPath.getFileSystem(hadoopConf) + + val dims = layout.getOrderedDimensions.keySet() + val sortColumns = NSparkCubingUtil.getColumns(dims) + + if (kapConfig.isAggIndexAdaptiveBuildEnabled + && unNeedRepartitionByShardCols(layout)) { + val df = dataFrame + .repartition() + .sortWithinPartitions(sortColumns: _*) + val metrics = StorageUtils.writeWithMetrics(df, outputPath.toString) + val rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT) + OutputSpec(metrics, rowCount, hadoopConf, -1) + } else { + val tempPath = outputPath.toString + TEMP_FLAG + System.currentTimeMillis() + val df = dataFrame.sortWithinPartitions(sortColumns: _*) + val metrics = StorageUtils.writeWithMetrics(df, tempPath) + val rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT) + storageListener.foreach(_.onPersistBeforeRepartition(df, layout)) + + val bucketNum = StorageUtils.calculateBucketNum(tempPath, layout, rowCount, kapConfig) + val summary = HadoopUtil.getContentSummary(fs, new Path(tempPath)) + val repartitionThresholdSize = if (findCountDistinctMeasure(layout)) { + kapConfig.getParquetStorageCountDistinctShardSizeRowCount + } else { + kapConfig.getParquetStorageShardSizeRowCount + } + + val repartitioner = new Repartitioner( + kapConfig.getParquetStorageShardSizeMB, + kapConfig.getParquetStorageRepartitionThresholdSize, + rowCount, + repartitionThresholdSize, + summary, + layout.getShardByColumns, + layout.getOrderedDimensions.keySet().asList(), + kapConfig.optimizeShardEnabled() + ) + repartitioner.doRepartition(outputPath.toString, tempPath, bucketNum, ss) + storageListener.foreach(_.onPersistAfterRepartition(ss.read.parquet(outputPath.toString), layout)) + OutputSpec(metrics, rowCount, hadoopConf, bucketNum) + } + } + + def unNeedRepartitionByShardCols(layout: LayoutEntity): Boolean = { + layout.getShardByColumns == null || layout.getShardByColumns.isEmpty + } + +} diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala index 9f59d2f9dc..4d05368d6e 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/datasource/storage/StorageStore.scala @@ -18,14 +18,14 @@ package org.apache.spark.sql.datasource.storage -import org.apache.kylin.engine.spark.job.NSparkCubingUtil -import org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure -import org.apache.kylin.engine.spark.utils.{JobMetrics, Metrics, Repartitioner, StorageUtils} -import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.kylin.common.KapConfig import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.engine.spark.job.NSparkCubingUtil +import org.apache.kylin.engine.spark.utils.StorageUtils.findCountDistinctMeasure +import org.apache.kylin.engine.spark.utils.{Metrics, Repartitioner, StorageUtils} +import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow} import org.apache.spark.internal.Logging import org.apache.spark.sql.LayoutEntityConverter._ import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -92,43 +92,12 @@ abstract class StorageStore extends Logging { class StorageStoreV1 extends StorageStore { override def save(layout: LayoutEntity, outputPath: Path, kapConfig: KapConfig, dataFrame: DataFrame): WriteTaskStats = { - val (metrics: JobMetrics, rowCount: Long, hadoopConf: Configuration, bucketNum: Int) = - repartitionWriter(layout, outputPath, kapConfig, dataFrame) - val (fileCount, byteSize) = collectFileCountAndSizeAfterSave(outputPath, hadoopConf) + val outputSpec = + LayoutFormatWriter.write(dataFrame, layout, outputPath, kapConfig, storageListener) + val (fileCount, byteSize) = collectFileCountAndSizeAfterSave(outputPath, outputSpec.hadoopConf) checkAndWriterFastBitmapLayout(dataFrame, layout, kapConfig, outputPath) - WriteTaskStats(0, fileCount, byteSize, rowCount, metrics.getMetrics(Metrics.SOURCE_ROWS_CNT), bucketNum, new util.ArrayList[String]()) - } - - private def repartitionWriter(layout: LayoutEntity, outputPath: Path, kapConfig: KapConfig, dataFrame: DataFrame) = { - val ss = dataFrame.sparkSession - val hadoopConf = ss.sparkContext.hadoopConfiguration - val fs = outputPath.getFileSystem(hadoopConf) - - val tempPath = outputPath.toString + TEMP_FLAG + System.currentTimeMillis() - val metrics = StorageUtils.writeWithMetrics(dataFrame, tempPath) - val rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT) - storageListener.foreach(_.onPersistBeforeRepartition(dataFrame, layout)) - - val bucketNum = StorageUtils.calculateBucketNum(tempPath, layout, rowCount, kapConfig) - val summary = HadoopUtil.getContentSummary(fs, new Path(tempPath)) - val repartitionThresholdSize = if (findCountDistinctMeasure(layout)) { - kapConfig.getParquetStorageCountDistinctShardSizeRowCount - } else { - kapConfig.getParquetStorageShardSizeRowCount - } - val repartitioner = new Repartitioner( - kapConfig.getParquetStorageShardSizeMB, - kapConfig.getParquetStorageRepartitionThresholdSize, - rowCount, - repartitionThresholdSize, - summary, - layout.getShardByColumns, - layout.getOrderedDimensions.keySet().asList(), - kapConfig.optimizeShardEnabled() - ) - repartitioner.doRepartition(outputPath.toString, tempPath, bucketNum, ss) - storageListener.foreach(_.onPersistAfterRepartition(ss.read.parquet(outputPath.toString), layout)) - (metrics, rowCount, hadoopConf, bucketNum) + WriteTaskStats(0, fileCount, byteSize, outputSpec.rowCount, + outputSpec.metrics.getMetrics(Metrics.SOURCE_ROWS_CNT), outputSpec.bucketNum, new util.ArrayList[String]()) } def checkAndWriterFastBitmapLayout(dataset: DataFrame, layoutEntity: LayoutEntity, kapConfig: KapConfig, layoutPath: Path): Unit = { @@ -153,7 +122,7 @@ class StorageStoreV1 extends StorageStore { } val afterReplaced = replaceCountDistinctEvalColumn(bitmaps, dataset) - repartitionWriter(layoutEntity, outputPath, kapConfig, afterReplaced) + LayoutFormatWriter.write(afterReplaced, layoutEntity, outputPath, kapConfig, storageListener) }