This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 68d75269d [Gluten-5256][CH]optimizing table after spark restart bug (#5258) 68d75269d is described below commit 68d75269d8af6819a1f48da9939ef36b44ffb141 Author: Hongbin Ma <mahong...@apache.org> AuthorDate: Wed Apr 3 13:27:17 2024 +0800 [Gluten-5256][CH]optimizing table after spark restart bug (#5258) [Gluten-5256][CH]optimizing table after spark restart bug --- .../delta/commands/OptimizeTableCommandBase.scala | 1 + .../sql/delta/commands/OptimizeTableCommand.scala | 2 + .../commands/OptimizeTableCommandOverwrites.scala | 323 --------------------- .../spark/sql/delta/commands/CommandUtils.scala | 43 +++ .../commands/OptimizeTableCommandOverwrites.scala | 7 +- .../GlutenClickHouseTableAfterRestart.scala | 178 ++++++++++-- 6 files changed, 210 insertions(+), 344 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala index bbfa2ecee..2d6ac48f5 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandBase.scala @@ -126,6 +126,7 @@ case class OptimizeTableCommand( override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { + CommandUtils.ensureClickHouseTableV2(tableId, sparkSession) val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, "OPTIMIZE") val partitionColumns = deltaLog.snapshot.metadata.partitionColumns diff --git a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 57427957d..346943671 100644 --- a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -126,6 +126,8 @@ case class OptimizeTableCommand( override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { + CommandUtils.ensureClickHouseTableV2(tableId, sparkSession) + val deltaLog = getDeltaLogClickhouse(sparkSession, path, tableId, "OPTIMIZE", options) val txn = deltaLog.startTransaction() diff --git a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala deleted file mode 100644 index 498b7ff4f..000000000 --- a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.delta.commands - -import org.apache.gluten.expression.ConverterUtils - -import org.apache.spark.{TaskContext, TaskOutputFileAlreadyExistException} -import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage -import org.apache.spark.internal.io.SparkHadoopWriterUtils -import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, OptimisticTransaction} -import org.apache.spark.sql.delta.actions.{AddFile, FileAction} -import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 -import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.CHDatasourceJniWrapper -import org.apache.spark.sql.execution.datasources.v1.CHMergeTreeWriterInjects -import org.apache.spark.sql.execution.datasources.v1.clickhouse._ -import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.{AddFileTags, AddMergeTreeParts} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.{SerializableConfiguration, SystemClock, Utils} - -import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} -import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl - -import java.util.{Date, UUID} - -import scala.collection.mutable.ArrayBuffer - -object OptimizeTableCommandOverwrites extends Logging { - - case class TaskDescription( - path: String, - database: String, - tableName: String, - orderByKeyOption: Option[Seq[String]], - lowCardKeyOption: Option[Seq[String]], - minmaxIndexKeyOption: Option[Seq[String]], - bfIndexKeyOption: Option[Seq[String]], - setIndexKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]], - partitionColumns: Seq[String], - partList: Seq[String], - tableSchema: StructType, - clickhouseTableConfigs: Map[String, String], - serializableHadoopConf: SerializableConfiguration, - jobIdInstant: Long, - partitionDir: Option[String], - bucketDir: Option[String] - ) - - private def executeTask( - description: TaskDescription, - sparkStageId: Int, - sparkPartitionId: Int, - sparkAttemptNumber: Int - ): MergeTreeWriteTaskResult = { - - val jobId = SparkHadoopWriterUtils.createJobID(new Date(description.jobIdInstant), sparkStageId) - val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) - val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber) - - // Set up the attempt context required to use in the output committer. - val taskAttemptContext: TaskAttemptContext = { - // Set up the configuration object - val hadoopConf = description.serializableHadoopConf.value - hadoopConf.set("mapreduce.job.id", jobId.toString) - hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString) - hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) - hadoopConf.setBoolean("mapreduce.task.ismap", true) - hadoopConf.setInt("mapreduce.task.partition", 0) - - new TaskAttemptContextImpl(hadoopConf, taskAttemptId) - } - - try { - Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - - val uuid = UUID.randomUUID.toString - - val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel( - description.path, - description.database, - description.tableName, - description.orderByKeyOption, - description.lowCardKeyOption, - description.minmaxIndexKeyOption, - description.bfIndexKeyOption, - description.setIndexKeyOption, - description.primaryKeyOption, - description.partitionColumns, - description.partList, - ConverterUtils.convertNamedStructJson(description.tableSchema), - description.clickhouseTableConfigs, - description.tableSchema.toAttributes - ) - - val datasourceJniWrapper = new CHDatasourceJniWrapper() - val returnedMetrics = - datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, - planWithSplitInfo.splitInfo, - uuid, - taskId.getId.toString, - description.partitionDir.getOrElse(""), - description.bucketDir.getOrElse("") - ) - if (returnedMetrics != null && returnedMetrics.nonEmpty) { - val addFiles = AddFileTags.partsMetricsToAddFile( - description.database, - description.tableName, - description.path, - returnedMetrics, - Seq(Utils.localHostName())) - - val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { - // committer.commitTask(taskAttemptContext) - new TaskCommitMessage(addFiles.toSeq) - } - -// val summary = MergeTreeExecutedWriteSummary( -// updatedPartitions = updatedPartitions.toSet, -// stats = statsTrackers.map(_.getFinalStats(taskCommitTime))) - MergeTreeWriteTaskResult(taskCommitMessage, null) - } else { - throw new IllegalStateException() - } - })( - catchBlock = { - // If there is an error, abort the task - logError(s"Job $jobId aborted.") - }, - finallyBlock = {}) - } catch { - case e: FetchFailedException => - throw e - case f: FileAlreadyExistsException if SQLConf.get.fastFailFileFormatOutput => - // If any output file to write already exists, it does not make sense to re-run this task. - // We throw the exception and let Executor throw ExceptionFailure to abort the job. - throw new TaskOutputFileAlreadyExistException(f) - case t: Throwable => - throw QueryExecutionErrors.taskFailedWhileWritingRowsError(t) - } - - } - - def runOptimizeBinJobClickhouse( - txn: OptimisticTransaction, - partitionValues: Map[String, String], - bucketNum: String, - bin: Seq[AddFile], - maxFileSize: Long): Seq[FileAction] = { - val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog); - - val sparkSession = SparkSession.getActiveSession.get - - val rddWithNonEmptyPartitions = - sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1) - - val jobIdInstant = new Date().getTime - val ret = new Array[MergeTreeWriteTaskResult](rddWithNonEmptyPartitions.partitions.length) - - val serializableHadoopConf = new SerializableConfiguration( - sparkSession.sessionState.newHadoopConfWithOptions( - txn.metadata.configuration ++ txn.deltaLog.options)) - - val partitionDir = if (tableV2.partitionColumns.isEmpty) { - None - } else { - Some(tableV2.partitionColumns.map(c => c + "=" + partitionValues(c)).mkString("/")) - } - - val bucketDir = if (tableV2.bucketOption.isEmpty) { - None - } else { - Some(bucketNum) - } - - val description = TaskDescription.apply( - txn.deltaLog.dataPath.toString, - tableV2.dataBaseName, - tableV2.tableName, - tableV2.orderByKeyOption, - tableV2.lowCardKeyOption, - tableV2.minmaxIndexKeyOption, - tableV2.bfIndexKeyOption, - tableV2.setIndexKeyOption, - tableV2.primaryKeyOption, - tableV2.partitionColumns, - bin.map(_.asInstanceOf[AddMergeTreeParts].name), - tableV2.schema(), - tableV2.clickhouseTableConfigs, - serializableHadoopConf, - jobIdInstant, - partitionDir, - bucketDir - ) - sparkSession.sparkContext.runJob( - rddWithNonEmptyPartitions, - (taskContext: TaskContext, _: Iterator[InternalRow]) => { - executeTask( - description, - taskContext.stageId(), - taskContext.partitionId(), - taskContext.taskAttemptId().toInt & Integer.MAX_VALUE - ) - }, - rddWithNonEmptyPartitions.partitions.indices, - (index, res: MergeTreeWriteTaskResult) => { - ret(index) = res - } - ) - - val addFiles = ret - .flatMap(_.commitMsg.obj.asInstanceOf[Seq[AddFile]]) - .toSeq - - val removeFiles = - bin.map(f => f.removeWithTimestamp(new SystemClock().getTimeMillis(), dataChange = false)) - addFiles ++ removeFiles - - } - - private def isDeltaTable(spark: SparkSession, tableName: TableIdentifier): Boolean = { - val catalog = spark.sessionState.catalog - val tableIsNotTemporaryTable = !catalog.isTempView(tableName) - val tableExists = { - (tableName.database.isEmpty || catalog.databaseExists(tableName.database.get)) && - catalog.tableExists(tableName) - } - tableIsNotTemporaryTable && tableExists && catalog - .getTableMetadata(tableName) - .provider - .get - .toLowerCase() - .equals("clickhouse") - } - - def getDeltaLogClickhouse( - spark: SparkSession, - path: Option[String], - tableIdentifier: Option[TableIdentifier], - operationName: String, - hadoopConf: Map[String, String] = Map.empty): DeltaLog = { - val tablePath = - if (tableIdentifier.nonEmpty && isDeltaTable(spark, tableIdentifier.get)) { - val sessionCatalog = spark.sessionState.catalog - lazy val metadata = sessionCatalog.getTableMetadata(tableIdentifier.get) - new Path(metadata.location) - } else { - throw new UnsupportedOperationException("OPTIMIZE is ony supported for clickhouse tables") - } - - val startTime = Some(System.currentTimeMillis) - val deltaLog = DeltaLog.forTable(spark, tablePath, hadoopConf) - if (deltaLog.update(checkIfUpdatedSinceTs = startTime).version < 0) { - throw DeltaErrors.notADeltaTableException( - operationName, - DeltaTableIdentifier(path, tableIdentifier)) - } - deltaLog - } - - def groupFilesIntoBinsClickhouse( - partitionsToCompact: Seq[((String, Map[String, String]), Seq[AddFile])], - maxTargetFileSize: Long): Seq[((String, Map[String, String]), Seq[AddFile])] = { - partitionsToCompact.flatMap { - case (partition, files) => - val bins = new ArrayBuffer[Seq[AddFile]]() - - val currentBin = new ArrayBuffer[AddFile]() - var currentBinSize = 0L - - files.sortBy(_.size).foreach { - file => - // Generally, a bin is a group of existing files, whose total size does not exceed the - // desired maxFileSize. They will be coalesced into a single output file. - // However, if isMultiDimClustering = true, all files in a partition will be read by the - // same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize - // will be produced. See below. - - // isMultiDimClustering is always false for Gluten Clickhouse for now - if (file.size + currentBinSize > maxTargetFileSize /*&& !isMultiDimClustering */ ) { - bins += currentBin.toVector - currentBin.clear() - currentBin += file - currentBinSize = file.size - } else { - currentBin += file - currentBinSize += file.size - } - } - - if (currentBin.nonEmpty) { - bins += currentBin.toVector - } - - bins - .map(b => (partition, b)) - // select bins that have at least two files or in case of multi-dim clustering - // select all bins - .filter(_._2.size > 1 /*|| isMultiDimClustering*/ ) - } - } -} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala new file mode 100644 index 000000000..262c37eff --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/CommandUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.commands + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.connector.catalog.Identifier + +object CommandUtils { + // Ensure ClickHouseTableV2 table exists + def ensureClickHouseTableV2( + tableId: Option[TableIdentifier], + sparkSession: SparkSession): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + if (tableId.isEmpty) { + throw new UnsupportedOperationException("Current command requires table identifier.") + } + // If user comes into this function without previously triggering loadTable + // (which creates ClickhouseTableV2), we have to load the table manually + // Notice: Multi-catalog case is not well considered! + sparkSession.sessionState.catalogManager.currentCatalog.asTableCatalog.loadTable( + Identifier.of( + Array( + tableId.get.database.getOrElse( + sparkSession.sessionState.catalogManager.currentNamespace.head)), + tableId.get.table) + ) + } +} diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala similarity index 98% rename from backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index 498b7ff4f..5aeafbf81 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -294,11 +294,12 @@ object OptimizeTableCommandOverwrites extends Logging { // Generally, a bin is a group of existing files, whose total size does not exceed the // desired maxFileSize. They will be coalesced into a single output file. // However, if isMultiDimClustering = true, all files in a partition will be read by the - // same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize + // same job, the data will be range-partitioned and + // numFiles = totalFileSize / maxFileSize // will be produced. See below. // isMultiDimClustering is always false for Gluten Clickhouse for now - if (file.size + currentBinSize > maxTargetFileSize /*&& !isMultiDimClustering */ ) { + if (file.size + currentBinSize > maxTargetFileSize /* && !isMultiDimClustering */ ) { bins += currentBin.toVector currentBin.clear() currentBin += file @@ -317,7 +318,7 @@ object OptimizeTableCommandOverwrites extends Logging { .map(b => (partition, b)) // select bins that have at least two files or in case of multi-dim clustering // select all bins - .filter(_._2.size > 1 /*|| isMultiDimClustering*/ ) + .filter(_._2.size > 1 /* || isMultiDimClustering */ ) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala index c6abbd444..e751c2fda 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala @@ -30,6 +30,7 @@ import java.io.File // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit +// This suite is to make sure clickhouse commands works well even after spark restart class GlutenClickHouseTableAfterRestart extends GlutenClickHouseTPCHAbstractSuite with AdaptiveSparkPlanHelper { @@ -73,7 +74,9 @@ class GlutenClickHouseTableAfterRestart override protected def initializeSession(): Unit = { if (_hiveSpark == null) { - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" + val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" + current_db_num + current_db_num += 1 + _hiveSpark = SparkSession .builder() .config(sparkConf) @@ -108,6 +111,8 @@ class GlutenClickHouseTableAfterRestart } } + var current_db_num: Int = 0 + test("test mergetree after restart") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree; @@ -182,13 +187,157 @@ class GlutenClickHouseTableAfterRestart assert(stats2.missCount() - oldMissingCount2 == 0) } + val oldMissingCount1 = ClickhouseSnapshot.deltaScanCache.stats().missCount() + val oldMissingCount2 = ClickhouseSnapshot.addFileToAddMTPCache.stats().missCount() + + restartSpark() + + runTPCHQueryBySQL(1, sqlStr)(_ => {}) + + // after restart, additionally check stats of delta scan cache + val stats1 = ClickhouseSnapshot.deltaScanCache.stats() + assert(stats1.missCount() - oldMissingCount1 == 1) + val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats() + assert(stats2.missCount() - oldMissingCount2 == 6) + + } + + test("test optimize after restart") { + spark.sql(s""" + |DROP TABLE IF EXISTS table_restart_optimize; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS table_restart_optimize (id bigint, name string) + |USING clickhouse + |LOCATION '$basePath/table_restart_optimize' + |""".stripMargin) + + spark.sql(s""" + | insert into table table_restart_optimize values (1,"tom"), (2, "jim") + |""".stripMargin) + // second file + spark.sql(s""" + | insert into table table_restart_optimize values (1,"tom"), (2, "jim") + |""".stripMargin) + + restartSpark() + + spark.sql("optimize table_restart_optimize") + assert(spark.sql("select count(*) from table_restart_optimize").collect().apply(0).get(0) == 4) + } + + test("test vacuum after restart") { + spark.sql(s""" + |DROP TABLE IF EXISTS table_restart_vacuum; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS table_restart_vacuum (id bigint, name string) + |USING clickhouse + |LOCATION '$basePath/table_restart_vacuum' + |""".stripMargin) + + spark.sql(s""" + | insert into table table_restart_vacuum values (1,"tom"), (2, "jim") + |""".stripMargin) + // second file + spark.sql(s""" + | insert into table table_restart_vacuum values (1,"tom"), (2, "jim") + |""".stripMargin) + + spark.sql("optimize table_restart_vacuum") + + restartSpark() + + spark.sql("set spark.gluten.enabled=false") + spark.sql("vacuum table_restart_vacuum") + spark.sql("set spark.gluten.enabled=true") + + assert(spark.sql("select count(*) from table_restart_vacuum").collect().apply(0).get(0) == 4) + } + + test("test update after restart") { + spark.sql(s""" + |DROP TABLE IF EXISTS table_restart_update; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS table_restart_update (id bigint, name string) + |USING clickhouse + |LOCATION '$basePath/table_restart_update' + |""".stripMargin) + + spark.sql(s""" + | insert into table table_restart_update values (1,"tom"), (2, "jim") + |""".stripMargin) + // second file + spark.sql(s""" + | insert into table table_restart_update values (1,"tom"), (2, "jim") + |""".stripMargin) + + restartSpark() + + spark.sql("update table_restart_update set name = 'tom' where id = 1") + + assert(spark.sql("select count(*) from table_restart_update").collect().apply(0).get(0) == 4) + } + + test("test delete after restart") { + spark.sql(s""" + |DROP TABLE IF EXISTS table_restart_delete; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS table_restart_delete (id bigint, name string) + |USING clickhouse + |LOCATION '$basePath/table_restart_delete' + |""".stripMargin) + + spark.sql(s""" + | insert into table table_restart_delete values (1,"tom"), (2, "jim") + |""".stripMargin) + // second file + spark.sql(s""" + | insert into table table_restart_delete values (1,"tom"), (2, "jim") + |""".stripMargin) + + restartSpark() + + spark.sql("delete from table_restart_delete where where id = 1") + + assert(spark.sql("select count(*) from table_restart_delete").collect().apply(0).get(0) == 2) + } + + test("test drop after restart") { + spark.sql(s""" + |DROP TABLE IF EXISTS table_restart_drop; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS table_restart_drop (id bigint, name string) + |USING clickhouse + |LOCATION '$basePath/table_restart_drop' + |""".stripMargin) + + spark.sql(s""" + | insert into table table_restart_drop values (1,"tom"), (2, "jim") + |""".stripMargin) + // second file + spark.sql(s""" + | insert into table table_restart_drop values (1,"tom"), (2, "jim") + |""".stripMargin) + + restartSpark() + + spark.sql("drop table table_restart_drop") + } + + private def restartSpark(): Unit = { // now restart ClickHouseTableV2.clearCache() ClickhouseSnapshot.clearAllFileStatusCache() - val oldMissingCount1 = ClickhouseSnapshot.deltaScanCache.stats().missCount() - val oldMissingCount2 = ClickhouseSnapshot.addFileToAddMTPCache.stats().missCount() - val session = getActiveSession.orElse(getDefaultSession) if (session.isDefined) { session.get.stop() @@ -196,29 +345,22 @@ class GlutenClickHouseTableAfterRestart SparkSession.clearDefaultSession() } - val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db" + val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" // use metastore_db2 to avoid issue: "Another instance of Derby may have already booted the database" - val destDir = new File(hiveMetaStoreDB + "2") + val destDir = new File(hiveMetaStoreDB + current_db_num) destDir.mkdirs() - FileUtils.copyDirectory(new File(hiveMetaStoreDB), destDir) + FileUtils.copyDirectory(new File(hiveMetaStoreDB + (current_db_num - 1)), destDir) _hiveSpark = null _hiveSpark = SparkSession .builder() .config(sparkConf) .enableHiveSupport() - .config("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=${hiveMetaStoreDB}2") + .config( + "javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$hiveMetaStoreDB$current_db_num") .master("local[2]") .getOrCreate() - - runTPCHQueryBySQL(1, sqlStr)(_ => {}) - - // after restart, additionally check stats of delta scan cache - val stats1 = ClickhouseSnapshot.deltaScanCache.stats() - assert(stats1.missCount() - oldMissingCount1 == 1) - val stats2 = ClickhouseSnapshot.addFileToAddMTPCache.stats() - assert(stats2.missCount() - oldMissingCount2 == 6) - + current_db_num += 1 } - } // scalastyle:off line.size.limit --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org