This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 65ad57add7 [Gluten-9254][CH] Support RDDScanExec (#9270)
65ad57add7 is described below
commit 65ad57add72e665b4e07b2d994347c9881d9fc13
Author: Shuai li <[email protected]>
AuthorDate: Tue Apr 15 16:35:27 2025 +0800
[Gluten-9254][CH] Support RDDScanExec (#9270)
* [Gluten-9254][CH] Support RDDScanExec
---
.../merge/MergeIntoMaterializeSource.scala | 499 +++++++++++++++++++++
.../apache/spark/sql/execution/RDDScanSuite.scala | 86 ++++
.../apache/gluten/component/CHKafkaComponent.scala | 2 +
.../columnar/KafkaMiscColumnarRules.scala | 47 ++
.../gluten/vectorized/BlockOutputStream.java | 2 +
.../apache/gluten/vectorized/CHNativeBlock.java | 6 +
.../apache/gluten/vectorized/CHStreamReader.java | 2 +
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 +
.../clickhouse/CHSparkPlanExecApi.scala | 30 +-
.../extension/CHRemoveTopmostColumnarToRow.scala | 71 +++
.../spark/sql/execution/CHRDDScanTransformer.scala | 133 ++++++
.../execution/GlutenFunctionValidateSuite.scala | 14 +-
.../gluten/execution/GlutenNothingValueCheck.scala | 18 +-
.../GlutenClickhouseFunctionSuite.scala | 12 +-
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 51 +--
.../AggregateFunctionDVRoaringBitmap.h | 1 -
cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp | 2 +-
cpp-ch/local-engine/Shuffle/ShuffleReader.cpp | 14 +-
cpp-ch/local-engine/Shuffle/ShuffleReader.h | 12 +-
cpp-ch/local-engine/Storages/IO/NativeReader.cpp | 59 ++-
cpp-ch/local-engine/Storages/IO/NativeReader.h | 24 +-
.../Storages/SubstraitSource/Delta/DeltaWriter.cpp | 15 +-
.../Storages/SubstraitSource/Delta/DeltaWriter.h | 2 +-
cpp-ch/local-engine/local_engine_jni.cpp | 59 ++-
.../gluten/extension/caller/CallerInfo.scala | 22 +-
.../gluten/backendsapi/SparkPlanExecApi.scala | 15 +
.../columnar/offload/OffloadSingleNodeRules.scala | 4 +
.../spark/sql/execution/RDDScanTransformer.scala | 52 +++
.../execution/datasources/FakeRowEnhancement.scala | 44 ++
.../datasources/GlutenWriterColumnarRules.scala | 2 +-
.../utils/clickhouse/ClickHouseTestSettings.scala | 1 +
.../utils/clickhouse/ClickHouseTestSettings.scala | 3 +
.../spark/sql/execution/datasources/FakeRow.scala | 2 +-
33 files changed, 1198 insertions(+), 109 deletions(-)
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala
new file mode 100644
index 0000000000..f1efab98df
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta.commands.merge
+
+import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.DeltaSparkPlanUtils
+import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
+import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf._
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+
+import scala.annotation.tailrec
+import scala.util.control.NonFatal
+
+/**
+ * Trait with logic and utilities used for materializing a snapshot of MERGE
source in case we can't
+ * guarantee deterministic repeated reads from it.
+ *
+ * We materialize source if it is not safe to assume that it's deterministic
(override with
+ * MERGE_SOURCE_MATERIALIZATION). Otherwise, if source changes between the
phases of the MERGE, it
+ * can produce wrong results. We use local checkpointing for the
materialization, which saves the
+ * source as a materialized RDD[InternalRow] on the executor local disks.
+ *
+ * 1st concern is that if an executor is lost, this data can be lost. When
Spark executor
+ * decommissioning API is used, it should attempt to move this materialized
data safely out before
+ * removing the executor.
+ *
+ * 2nd concern is that if an executor is lost for another reason (e.g. spot
kill), we will still
+ * lose that data. To mitigate that, we implement a retry loop. The whole
Merge operation needs to
+ * be restarted from the beginning in this case. When we retry, we increase
the replication level of
+ * the materialized data from 1 to 2. (override with
+ * MERGE_SOURCE_MATERIALIZATION_RDD_STORAGE_LEVEL_RETRY). If it still fails
after the maximum number
+ * of attempts (MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS), we record the failure
for tracking purposes.
+ *
+ * 3rd concern is that executors run out of disk space with the extra
materialization. We record
+ * such failures for tracking purposes.
+ */
+trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils
{
+
+ import MergeIntoMaterializeSource._
+
+ /**
+ * Prepared Dataframe with source data. If needed, it is materialized, @see
prepareMergeSource
+ */
+ private var mergeSource: Option[MergeSource] = None
+
+ /** If the source was materialized, reference to the checkpointed RDD. */
+ protected var materializedSourceRDD: Option[RDD[InternalRow]] = None
+
+ /** Track which attempt or retry it is in
runWithMaterializedSourceAndRetries */
+ protected var attempt: Int = 0
+
+ /**
+ * Run the Merge with retries in case it detects an RDD block lost error of
the materialized
+ * source RDD. It will also record out of disk error, if such happens -
possibly because of
+ * increased disk pressure from the materialized source RDD.
+ */
+ protected def runWithMaterializedSourceLostRetries(
+ spark: SparkSession,
+ deltaLog: DeltaLog,
+ metrics: Map[String, SQLMetric],
+ runMergeFunc: SparkSession => Seq[Row]): Seq[Row] = {
+ var doRetry = false
+ var runResult: Seq[Row] = null
+ attempt = 1
+ do {
+ doRetry = false
+ metrics.values.foreach(_.reset())
+ try {
+ runResult = runMergeFunc(spark)
+ } catch {
+ case NonFatal(ex) =>
+ val isLastAttempt =
+ attempt ==
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_MAX_ATTEMPTS)
+ handleExceptionDuringAttempt(ex, isLastAttempt, deltaLog) match {
+ case RetryHandling.Retry =>
+ logInfo(s"Retrying MERGE with materialized source. Attempt
$attempt failed.")
+ doRetry = true
+ attempt += 1
+ case RetryHandling.ExhaustedRetries =>
+ logError(
+ s"Exhausted retries after $attempt attempts in MERGE with" +
+ s" materialized source. Logging latest exception.",
+ ex)
+ throw DeltaErrors.sourceMaterializationFailedRepeatedlyInMerge
+ case RetryHandling.RethrowException =>
+ logError(s"Fatal error in MERGE with materialized source in
attempt $attempt.", ex)
+ throw ex
+ }
+ } finally {
+ // Remove source from RDD cache (noop if wasn't cached)
+ materializedSourceRDD.foreach(rdd => rdd.unpersist())
+ materializedSourceRDD = None
+ mergeSource = None
+ }
+ } while (doRetry)
+
+ runResult
+ }
+
+ object RetryHandling extends Enumeration {
+ type Result = Value
+
+ val Retry, RethrowException, ExhaustedRetries = Value
+ }
+
+ /**
+ * Handle exception that was thrown from runMerge(). Search for errors to
log, or that can be
+ * handled by retry. It may need to descend into ex.getCause() to find the
errors, as Spark may
+ * have wrapped them.
+ * @param isLastAttempt
+ * indicates that it's the last allowed attempt and there shall be no
retry.
+ * @return
+ * true if the exception is handled and merge should retry false if the
caller should rethrow
+ * the error
+ */
+ @tailrec
+ private def handleExceptionDuringAttempt(
+ ex: Throwable,
+ isLastAttempt: Boolean,
+ deltaLog: DeltaLog): RetryHandling.Result = ex match {
+ // If Merge failed because the materialized source lost blocks from the
+ // locally checkpointed RDD, we want to retry the whole operation.
+ // If a checkpointed RDD block is lost, it throws
+ // SparkCoreErrors.checkpointRDDBlockIdNotFoundError from
LocalCheckpointRDD.compute.
+ case s: SparkException
+ if materializedSourceRDD.nonEmpty &&
+ s.getMessage.matches(
+
mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) =>
+ log.warn(
+ "Materialized Merge source RDD block lost. Merge needs to be
restarted. " +
+ s"This was attempt number $attempt.")
+ if (!isLastAttempt) {
+ RetryHandling.Retry
+ } else {
+ // Record situations where we lost RDD materialized source blocks,
despite retries.
+ recordDeltaEvent(
+ deltaLog,
+ MergeIntoMaterializeSourceError.OP_TYPE,
+ data = MergeIntoMaterializeSourceError(
+ errorType =
MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString,
+ attempt = attempt,
+ materializedSourceRDDStorageLevel =
materializedSourceRDD.get.getStorageLevel.toString
+ )
+ )
+ RetryHandling.ExhaustedRetries
+ }
+
+ // Record if we ran out of executor disk space when we materialized the
source.
+ case s: SparkException
+ if materializedSourceRDD.nonEmpty &&
+ s.getMessage.contains("java.io.IOException: No space left on
device") =>
+ // Record situations where we ran out of disk space, possibly because of
the space took
+ // by the materialized RDD.
+ recordDeltaEvent(
+ deltaLog,
+ MergeIntoMaterializeSourceError.OP_TYPE,
+ data = MergeIntoMaterializeSourceError(
+ errorType = MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString,
+ attempt = attempt,
+ materializedSourceRDDStorageLevel =
materializedSourceRDD.get.getStorageLevel.toString
+ )
+ )
+ RetryHandling.RethrowException
+
+ // Descend into ex.getCause.
+ // The errors that we are looking for above might have been wrapped inside
another exception.
+ case NonFatal(ex) if ex.getCause() != null =>
+ handleExceptionDuringAttempt(ex.getCause(), isLastAttempt, deltaLog)
+
+ // Descended to the bottom of the causes without finding a retryable error
+ case _ => RetryHandling.RethrowException
+ }
+
+ private def planContainsIgnoreUnreadableFilesReadOptions(plan: LogicalPlan):
Boolean = {
+ def relationContainsOptions(relation: BaseRelation): Boolean = {
+ relation match {
+ case hdpRelation: HadoopFsRelation =>
+
hdpRelation.options.get(FileSourceOptions.IGNORE_CORRUPT_FILES).contains("true")
||
+
hdpRelation.options.get(FileSourceOptions.IGNORE_MISSING_FILES).contains("true")
+ case _ => false
+ }
+ }
+
+ val res = plan.collectFirst {
+ case lr: LogicalRelation if relationContainsOptions(lr.relation) => lr
+ }
+ res.nonEmpty
+ }
+
+ private def ignoreUnreadableFilesConfigsAreSet(
+ plan: LogicalPlan,
+ spark: SparkSession): Boolean = {
+ spark.conf.get(IGNORE_MISSING_FILES) ||
spark.conf.get(IGNORE_CORRUPT_FILES) ||
+ planContainsIgnoreUnreadableFilesReadOptions(plan)
+ }
+
+ /**
+ * @return
+ * pair of boolean whether source should be materialized and the source
materialization reason
+ */
+ protected def shouldMaterializeSource(
+ spark: SparkSession,
+ source: LogicalPlan,
+ isInsertOnly: Boolean
+ ): (Boolean,
MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) = {
+ val materializeType = spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE)
+ val forceMaterializationWithUnreadableFiles =
+
spark.conf.get(DeltaSQLConf.MERGE_FORCE_SOURCE_MATERIALIZATION_WITH_UNREADABLE_FILES)
+ import DeltaSQLConf.MergeMaterializeSource._
+ val checkDeterministicOptions =
+ DeltaSparkPlanUtils.CheckDeterministicOptions(allowDeterministicUdf =
true)
+ materializeType match {
+ case ALL =>
+ (true, MergeIntoMaterializeSourceReason.MATERIALIZE_ALL)
+ case NONE =>
+ (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_NONE)
+ case AUTO =>
+ if (isInsertOnly &&
spark.conf.get(DeltaSQLConf.MERGE_INSERT_ONLY_ENABLED)) {
+ (false,
MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO_INSERT_ONLY)
+ } else if (!planContainsOnlyDeltaScans(source)) {
+ (true,
MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_NON_DELTA)
+ } else if (!planIsDeterministic(source, checkDeterministicOptions)) {
+ (true,
MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_OPERATORS)
+ // Force source materialization if Spark configs
IGNORE_CORRUPT_FILES,
+ // IGNORE_MISSING_FILES or file source read options
FileSourceOptions.IGNORE_CORRUPT_FILES
+ // FileSourceOptions.IGNORE_MISSING_FILES are enabled on the source.
+ // This is done so to prevent irrecoverable data loss or unexpected
results.
+ } else if (
+ forceMaterializationWithUnreadableFiles &&
+ ignoreUnreadableFilesConfigsAreSet(source, spark)
+ ) {
+ (true,
MergeIntoMaterializeSourceReason.IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET)
+ } else if (planContainsUdf(source)) {
+ // Force source materialization if the source contains a User
Defined Function, even if
+ // the user defined function is marked as deterministic, as it is
often incorrectly marked
+ // as such.
+ (true,
MergeIntoMaterializeSourceReason.NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF)
+ } else {
+ (false, MergeIntoMaterializeSourceReason.NOT_MATERIALIZED_AUTO)
+ }
+ case _ =>
+ // If the config is invalidly set, also materialize.
+ (true, MergeIntoMaterializeSourceReason.INVALID_CONFIG)
+ }
+ }
+
+ /**
+ * If source needs to be materialized, prepare the materialized dataframe in
sourceDF Otherwise,
+ * prepare regular dataframe.
+ * @return
+ * the source materialization reason
+ */
+ protected def prepareMergeSource(
+ spark: SparkSession,
+ source: LogicalPlan,
+ condition: Expression,
+ matchedClauses: Seq[DeltaMergeIntoMatchedClause],
+ notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause],
+ isInsertOnly: Boolean): Unit = {
+ val (materialize, materializeReason) =
+ shouldMaterializeSource(spark, source, isInsertOnly)
+
+ // --- modified start
+ val originalRemoveTopmostC2R =
CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(spark)
+ try{
+ spark.sparkContext.setLocalProperty(
+ CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
+ "true")
+ // --- modified end
+
+ if (!materialize) {
+ // Does not materialize, simply return the dataframe from source plan
+ mergeSource = Some(
+ MergeSource(
+ df = Dataset.ofRows(spark, source),
+ isMaterialized = false,
+ materializeReason = materializeReason
+ )
+ )
+ return
+ }
+
+ val referencedSourceColumns =
+ getReferencedSourceColumns(source, condition, matchedClauses,
notMatchedClauses)
+ // When we materialize the source, we want to make sure that columns got
pruned before caching.
+ val sourceWithSelectedColumns = Project(referencedSourceColumns, source)
+ val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns)
+
+ // Caches the source in RDD cache using localCheckpoint, which cuts away
the RDD lineage,
+ // which shall ensure that the source cannot be recomputed and thus
become inconsistent.
+ val checkpointedSourcePlanDF = baseSourcePlanDF
+ // Set eager=false for now, even if we should be doing eager, so that
we can set the storage
+ // level before executing.
+ .localCheckpoint(eager = false)
+
+ // We have to reach through the crust and into the plan of the
checkpointed DF
+ // to get the RDD that was actually checkpointed, to be able to
unpersist it later...
+ var checkpointedPlan = checkpointedSourcePlanDF.queryExecution.analyzed
+ val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd
+ materializedSourceRDD = Some(rdd)
+ rdd.setName("mergeMaterializedSource")
+
+ // We should still keep the hints from the input plan.
+ checkpointedPlan = addHintsToPlan(source, checkpointedPlan)
+
+ mergeSource = Some(
+ MergeSource(
+ df = Dataset.ofRows(spark, checkpointedPlan),
+ isMaterialized = true,
+ materializeReason = materializeReason
+ )
+ )
+
+ // Sets appropriate StorageLevel
+ val storageLevel = StorageLevel.fromString(
+ if (attempt == 1) {
+
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)
+ } else {
+ // If it failed the first time, potentially use a different storage
level on retry.
+
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY)
+ }
+ )
+ rdd.persist(storageLevel)
+
+ // WARNING: if eager == false, the source used during the first Spark
Job that uses this may
+ // still be inconsistent with source materialized afterwards.
+ // This is because doCheckpoint that finalizes the lazy checkpoint is
called after the Job
+ // that triggered the lazy checkpointing finished.
+ // If blocks were lost during that job, they may still get recomputed
and changed compared
+ // to how they were used during the execution of the job.
+ if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) {
+ // Force the evaluation of the `rdd`, since we cannot access
`doCheckpoint()` from here.
+ rdd
+ .mapPartitions(_ =>
Iterator.empty.asInstanceOf[Iterator[InternalRow]])
+ .foreach((_: InternalRow) => ())
+ assert(rdd.isCheckpointed)
+ }
+
+ logDebug(s"Materializing MERGE with pruned columns
$referencedSourceColumns.")
+ logDebug(s"Materialized MERGE source
plan:\n${getMergeSource.df.queryExecution}")
+ } finally {
+ // --- modified start
+
CHRemoveTopmostColumnarToRow.setRemoveTopmostC2R(originalRemoveTopmostC2R,
spark)
+ // --- modified end
+ }
+ }
+
+ /** Returns the prepared merge source. */
+ protected def getMergeSource: MergeSource = mergeSource match {
+ case Some(source) => source
+ case None =>
+ throw new IllegalStateException(
+ "mergeSource was not initialized! Call prepareMergeSource before.")
+ }
+
+ private def addHintsToPlan(sourcePlan: LogicalPlan, plan: LogicalPlan):
LogicalPlan = {
+ val hints = EliminateResolvedHint.extractHintsFromPlan(sourcePlan)._2
+ // This follows similar code in CacheManager from
https://github.com/apache/spark/pull/24580
+ if (hints.nonEmpty) {
+ // The returned hint list is in top-down order, we should create the
hint nodes from
+ // right to left.
+ val planWithHints =
+ hints.foldRight[LogicalPlan](plan) {
+ case (hint, p) =>
+ ResolvedHint(p, hint)
+ }
+ planWithHints
+ } else {
+ plan
+ }
+ }
+}
+
+object MergeIntoMaterializeSource {
+ case class MergeSource(
+ df: DataFrame,
+ isMaterialized: Boolean,
+ materializeReason:
MergeIntoMaterializeSourceReason.MergeIntoMaterializeSourceReason) {
+ assert(
+ !isMaterialized ||
+
MergeIntoMaterializeSourceReason.MATERIALIZED_REASONS.contains(materializeReason))
+ }
+
+ // This depends on SparkCoreErrors.checkpointRDDBlockIdNotFoundError msg
+ def mergeMaterializedSourceRddBlockLostErrorRegex(rddId: Int): String =
+ s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*"
+
+ /** @return The columns of the source plan that are used in this MERGE */
+ private def getReferencedSourceColumns(
+ source: LogicalPlan,
+ condition: Expression,
+ matchedClauses: Seq[DeltaMergeIntoMatchedClause],
+ notMatchedClauses: Seq[DeltaMergeIntoNotMatchedClause]) = {
+ val conditionCols = condition.references
+ val matchedCondCols =
matchedClauses.flatMap(_.condition).flatMap(_.references)
+ val notMatchedCondCols =
notMatchedClauses.flatMap(_.condition).flatMap(_.references)
+ val matchedActionsCols = matchedClauses
+ .flatMap(_.resolvedActions)
+ .flatMap(_.expr.references)
+ val notMatchedActionsCols = notMatchedClauses
+ .flatMap(_.resolvedActions)
+ .flatMap(_.expr.references)
+ val allCols = AttributeSet(
+ conditionCols ++
+ matchedCondCols ++
+ notMatchedCondCols ++
+ matchedActionsCols ++
+ notMatchedActionsCols)
+
+ source.output.filter(allCols.contains)
+ }
+}
+
+/** Enumeration with possible reasons that source may be materialized in a
MERGE command. */
+object MergeIntoMaterializeSourceReason extends Enumeration {
+ type MergeIntoMaterializeSourceReason = Value
+ // It was determined to not materialize on auto config.
+ val NOT_MATERIALIZED_AUTO = Value("notMaterializedAuto")
+ // Config was set to never materialize source.
+ val NOT_MATERIALIZED_NONE = Value("notMaterializedNone")
+ // Insert only merge is single pass, no need for materialization
+ val NOT_MATERIALIZED_AUTO_INSERT_ONLY =
Value("notMaterializedAutoInsertOnly")
+ // Config was set to always materialize source.
+ val MATERIALIZE_ALL = Value("materializeAll")
+ // The source query is considered non-deterministic, because it contains a
non-delta scan.
+ val NON_DETERMINISTIC_SOURCE_NON_DELTA =
Value("materializeNonDeterministicSourceNonDelta")
+ // The source query is considered non-deterministic, because it contains
non-deterministic
+ // operators.
+ val NON_DETERMINISTIC_SOURCE_OPERATORS =
Value("materializeNonDeterministicSourceOperators")
+ // Either spark configs to ignore unreadable files are set or the source
plan contains relations
+ // with ignore unreadable files options.
+ val IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET =
+ Value("materializeIgnoreUnreadableFilesConfigsAreSet")
+ // The source query is considered non-determistic because it contains a User
Defined Function.
+ val NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF =
+ Value("materializeNonDeterministicSourceWithDeterministicUdf")
+ // Materialize when the configuration is invalid
+ val INVALID_CONFIG = Value("invalidConfigurationFailsafe")
+ // Catch-all case.
+ val UNKNOWN = Value("unknown")
+
+ // Set of reasons that result in source materialization.
+ final val MATERIALIZED_REASONS: Set[MergeIntoMaterializeSourceReason] = Set(
+ MATERIALIZE_ALL,
+ NON_DETERMINISTIC_SOURCE_NON_DELTA,
+ NON_DETERMINISTIC_SOURCE_OPERATORS,
+ IGNORE_UNREADABLE_FILES_CONFIGS_ARE_SET,
+ NON_DETERMINISTIC_SOURCE_WITH_DETERMINISTIC_UDF,
+ INVALID_CONFIG
+ )
+}
+
+/**
+ * Structure with data for "delta.dml.merge.materializeSourceError" event.
Note: We log only errors
+ * that we want to track (out of disk or lost RDD blocks).
+ */
+case class MergeIntoMaterializeSourceError(
+ errorType: String,
+ attempt: Int,
+ materializedSourceRDDStorageLevel: String
+)
+
+object MergeIntoMaterializeSourceError {
+ val OP_TYPE = "delta.dml.merge.materializeSourceError"
+}
+
+object MergeIntoMaterializeSourceErrorType extends Enumeration {
+ type MergeIntoMaterializeSourceError = Value
+ val RDD_BLOCK_LOST = Value("materializeSourceRDDBlockLostRetriesFailure")
+ val OUT_OF_DISK = Value("materializeSourceOutOfDiskFailure")
+}
diff --git
a/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala
new file mode 100644
index 0000000000..ab9479ff66
--- /dev/null
+++
b/backends-clickhouse/src-delta-32/test/scala/org/apache/spark/sql/execution/RDDScanSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution._
+import org.apache.gluten.extension.CHRemoveTopmostColumnarToRow
+
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class RDDScanSuite extends GlutenClickHouseTPCHAbstractSuite with
AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath + "queries-output"
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ test("test RDDScanTransform") {
+ val test_sql =
+ """
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty
+ |FROM
+ | lineitem
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |""".stripMargin
+
+ val expectedAnswer = sql(test_sql).collect()
+
+ spark.sparkContext.setLocalProperty(
+ CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
+ "true")
+ val data = sql(test_sql)
+ val node = LogicalRDD.fromDataset(
+ rdd = data.queryExecution.toRdd,
+ originDataset = data,
+ isStreaming = false)
+
+ spark.sparkContext.setLocalProperty(
+ CHRemoveTopmostColumnarToRow.REMOVE_TOPMOST_COLUMNAR_TO_ROW,
+ "false")
+ val df = Dataset.ofRows(data.sparkSession, node).toDF()
+ checkAnswer(df, expectedAnswer)
+
+ var cnt = df.queryExecution.executedPlan.collect { case _:
CHRDDScanTransformer => true }
+ assertResult(1)(cnt.size)
+
+ val data2 = sql(test_sql)
+ val node2 = LogicalRDD.fromDataset(
+ rdd = data2.queryExecution.toRdd,
+ originDataset = data2,
+ isStreaming = false)
+
+ val df2 = Dataset.ofRows(data2.sparkSession, node2).toDF()
+ checkAnswer(df2, expectedAnswer)
+ cnt = df2.queryExecution.executedPlan.collect { case _:
CHRDDScanTransformer => true }
+ assertResult(1)(cnt.size)
+ }
+}
diff --git
a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
index 1e9f0f9d77..f388ed8623 100644
---
a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
+++
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/component/CHKafkaComponent.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.component
import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.execution.OffloadKafkaScan
+import
org.apache.gluten.extension.columnar.KafkaMiscColumnarRules.RemoveStreamingTopmostColumnarToRow
import org.apache.gluten.extension.injector.Injector
class CHKafkaComponent extends Component {
@@ -28,5 +29,6 @@ class CHKafkaComponent extends Component {
override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend]
:: Nil
override def injectRules(injector: Injector): Unit = {
OffloadKafkaScan.inject(injector)
+ injector.gluten.legacy.injectPost(c =>
RemoveStreamingTopmostColumnarToRow(c.session, c.caller.isStreaming()))
}
}
diff --git
a/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala
new file mode 100644
index 0000000000..38e163cb44
--- /dev/null
+++
b/backends-clickhouse/src-kafka/main/scala/org/apache/gluten/extension/columnar/KafkaMiscColumnarRules.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.execution.MicroBatchScanExecTransformer
+import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+
+object KafkaMiscColumnarRules {
+ // Remove topmost columnar-to-row.
+ case class RemoveStreamingTopmostColumnarToRow(session: SparkSession,
isStreamingPlan: Boolean)
+ extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (
+ !isStreamingPlan || plan.collectFirst { case e:
MicroBatchScanExecTransformer => e }.isEmpty
+ ) {
+ return plan
+ }
+
+ plan match {
+ case ColumnarToRowLike(child) => wrapperFakeRowAdaptor(child)
+ case other => other
+ }
+ }
+
+ private def wrapperFakeRowAdaptor(plan: SparkPlan): SparkPlan = {
+ FakeRowAdaptor(plan)
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
index d9006a098d..1885538f9b 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java
@@ -74,6 +74,8 @@ public class BlockOutputStream implements Closeable {
private native void nativeFlush(long instance);
+ public static native long directWrite(OutputStream stream, byte[] buf, int
size, long block);
+
public void write(ColumnarBatch cb) {
if (cb.numCols() == 0 || cb.numRows() == 0) return;
CHNativeBlock block = CHNativeBlock.fromColumnarBatch(cb);
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
index c8cce61b4f..8cf365335c 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
@@ -77,6 +77,12 @@ public class CHNativeBlock {
return nativeBlockStats(blockAddress, columnPosition);
}
+ public static native long copyBlock(long blockAddress);
+
+ public ColumnarBatch copyColumnarBatch() {
+ return new CHNativeBlock(copyBlock(blockAddress)).toColumnarBatch();
+ }
+
public void close() {
if (blockAddress != 0) {
nativeClose(blockAddress);
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
index 3151adfceb..0e081a099a 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHStreamReader.java
@@ -49,6 +49,8 @@ public class CHStreamReader implements AutoCloseable {
private native long nativeNext(long nativeShuffleReader);
+ public static native long directRead(InputStream inputStream, byte[] buffer,
int bufferSize);
+
public CHNativeBlock next() {
long block = nativeNext(nativeShuffleReader);
return new CHNativeBlock(block);
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 374e223ea9..411351e9a4 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -135,6 +135,7 @@ object CHRuleApi {
// Gluten columnar: Post rules.
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session,
c.caller.isAqe()))
+ injector.injectPost(c => CHRemoveTopmostColumnarToRow(c.session,
c.caller.isAqe()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index c77e7bc31b..eecf0588d4 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -27,7 +27,7 @@ import
org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.expression.{ExpressionBuilder,
ExpressionNode, WindowFunctionNode}
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
-import org.apache.gluten.vectorized.CHColumnarBatchSerializer
+import org.apache.gluten.vectorized.{BlockOutputStream,
CHColumnarBatchSerializer, CHNativeBlock, CHStreamReader}
import org.apache.spark.ShuffleDependency
import org.apache.spark.internal.Logging
@@ -58,6 +58,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.commons.lang3.ClassUtils
+import java.io.{ObjectInputStream, ObjectOutputStream}
import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
@@ -525,6 +526,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
wrapChild(union)
case ordered: TakeOrderedAndProjectExecTransformer =>
wrapChild(ordered)
+ case rddScan: CHRDDScanTransformer =>
+ wrapChild(rddScan)
case other =>
throw new GlutenNotSupportException(
s"Not supported operator ${other.nodeName} for
BroadcastRelation")
@@ -965,4 +968,29 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
case co: FlattenedOr => GenericExpressionTransformer(co.name, children, co)
case _ => super.genFlattenedExpressionTransformer(substraitName, children,
expr)
}
+
+ override def isSupportRDDScanExec(plan: RDDScanExec): Boolean = true
+
+ override def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
+ CHRDDScanTransformer.replace(plan)
+
+ override def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
+ CHNativeBlock.fromColumnarBatch(batch).copyColumnarBatch()
+
+ override def serializeColumnarBatch(output: ObjectOutputStream, batch:
ColumnarBatch): Unit = {
+ val writeBuffer: Array[Byte] =
+ new Array[Byte](CHBackendSettings.customizeBufferSize)
+ BlockOutputStream.directWrite(
+ output,
+ writeBuffer,
+ CHBackendSettings.customizeBufferSize,
+ CHNativeBlock.fromColumnarBatch(batch).blockAddress())
+ }
+
+ override def deserializeColumnarBatch(input: ObjectInputStream):
ColumnarBatch = {
+ val bufferSize = CHBackendSettings.customizeBufferSize
+ val readBuffer: Array[Byte] = new Array[Byte](bufferSize)
+ val address = CHStreamReader.directRead(input, readBuffer, bufferSize)
+ new CHNativeBlock(address).toColumnarBatch
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala
new file mode 100644
index 0000000000..d05cbb28ce
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CHRemoveTopmostColumnarToRow.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
+import org.apache.spark.sql.execution.datasources.FakeRowAdaptor
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
+
+// Remove the topmost columnar-to-row conversion
+// Primarily for structured streaming and delta deletion vector
+//
+// Sometimes, the code uses dataFrame.queryExecution.toRdd as the data source.
+// queryExecution will use columnar-to-row (c2r) and row-to-columnar (r2c)
+// conversions for the next operation.
+// This rule aims to eliminate the redundant double conversion.
+case class CHRemoveTopmostColumnarToRow(session: SparkSession,
isAdaptiveContext: Boolean)
+ extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ val removeTopmostColumnarToRow =
CHRemoveTopmostColumnarToRow.isRemoveTopmostC2R(session)
+
+ if (!removeTopmostColumnarToRow) {
+ return plan
+ }
+
+ plan match {
+ case shuffleExchangeLike @ ColumnarToRowLike(_: ShuffleExchangeLike) =>
+ shuffleExchangeLike
+ case broadcastExchangeLike @ ColumnarToRowLike(_: BroadcastExchangeLike)
=>
+ broadcastExchangeLike
+ case broadcastQueryStageExec @ ColumnarToRowLike(_:
BroadcastQueryStageExec) =>
+ broadcastQueryStageExec
+ case ColumnarToRowLike(child) => wrapperColumnarRowAdaptor(child)
+ case other => other
+ }
+ }
+
+ private def wrapperColumnarRowAdaptor(plan: SparkPlan): SparkPlan = {
+ FakeRowAdaptor(plan)
+ }
+}
+
+object CHRemoveTopmostColumnarToRow {
+ val REMOVE_TOPMOST_COLUMNAR_TO_ROW: String =
"gluten.removeTopmostColumnarToRow"
+
+ def isRemoveTopmostC2R(spark: SparkSession): Boolean = {
+
Option(spark.sparkContext.getLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW)).exists(_.toBoolean)
+ }
+
+ def setRemoveTopmostC2R(value: Boolean, spark: SparkSession): Unit = {
+ spark.sparkContext.setLocalProperty(REMOVE_TOPMOST_COLUMNAR_TO_ROW,
value.toString)
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala
new file mode 100644
index 0000000000..70491b295e
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHRDDScanTransformer.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.SparkRowIterator
+import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.vectorized.{CHBlockConverterJniWrapper, CHNativeBlock}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder,
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
UnknownPartitioning}
+import org.apache.spark.sql.execution.datasources.FakeRow
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class CHRDDScanTransformer(
+ outputAttributes: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ name: String,
+ override val outputPartitioning: Partitioning = UnknownPartitioning(0),
+ override val outputOrdering: Seq[SortOrder]
+) extends RDDScanTransformer(outputAttributes, outputPartitioning,
outputOrdering) {
+
+ override protected def doValidateInternal(): ValidationResult = {
+ output
+ .foreach(attr => ConverterUtils.getTypeNode(attr.dataType,
attr.nullable))
+ ValidationResult.succeeded
+ }
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val localSchema = this.schema
+ val fieldNames = output.map(ConverterUtils.genColumnNameWithExprId).toArray
+ val fieldTypes = output
+ .map(attr => ConverterUtils.getTypeNode(attr.dataType,
attr.nullable).toProtobuf.toByteArray)
+ .toArray
+
+ rdd.mapPartitions(
+ it => {
+ if (it.hasNext) {
+ val projection = UnsafeProjection.create(localSchema)
+
+ val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
+ var sample = false
+ var isBatch = false
+ var byteArrayIterator: Iterator[Array[Byte]] = _
+ private var last_address: Long = 0
+
+ override def hasNext: Boolean = {
+ if (isBatch || !sample) {
+ it.hasNext
+ } else {
+ if (last_address != 0) {
+ CHBlockConverterJniWrapper.freeBlock(last_address)
+ last_address = 0
+ }
+ byteArrayIterator.hasNext
+ }
+ }
+
+ override def next(): ColumnarBatch = {
+ if (isBatch) {
+ return it.next().asInstanceOf[FakeRow].batch
+ }
+
+ if (sample) {
+ val slice = byteArrayIterator.take(8192)
+ val sparkRowIterator = new SparkRowIterator(slice)
+ last_address = CHBlockConverterJniWrapper
+ .convertSparkRowsToCHColumn(sparkRowIterator, fieldNames,
fieldTypes)
+ return new CHNativeBlock(last_address).toColumnarBatch
+ }
+
+ sample = true
+ val data = it.next()
+ data match {
+ case row: FakeRow =>
+ isBatch = true
+ return row.batch
+ case _ =>
+ byteArrayIterator = it.map {
+ case u: UnsafeRow => u.getBytes
+ case i: InternalRow => projection.apply(i).getBytes
+ }
+
+ // deal first block
+ val bytes = data match {
+ case u: UnsafeRow => u.getBytes
+ case i: InternalRow => projection.apply(i).getBytes
+ }
+
+ val sparkRowIterator = new
SparkRowIterator(Iterator.apply(bytes))
+ last_address = CHBlockConverterJniWrapper
+ .convertSparkRowsToCHColumn(sparkRowIterator, fieldNames,
fieldTypes)
+ new CHNativeBlock(last_address).toColumnarBatch
+ }
+ }
+
+ }
+ res
+ } else {
+ Iterator.empty
+ }
+ })
+ }
+
+ override protected def withNewChildrenInternal(newChildren:
IndexedSeq[SparkPlan]): SparkPlan =
+ copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering)
+}
+
+object CHRDDScanTransformer {
+ def replace(rdd: RDDScanExec): RDDScanTransformer =
+ CHRDDScanTransformer(
+ rdd.output,
+ rdd.inputRDD,
+ rdd.nodeName,
+ rdd.outputPartitioning,
+ rdd.outputOrdering)
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
index 02e5b34a37..2250f8bfe3 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
@@ -563,7 +563,7 @@ class GlutenFunctionValidateSuite extends
GlutenClickHouseWholeStageTransformerS
| str_to_map('a,b', ',', ''),
| str_to_map('a:c|b:c', '\\|', ':')
|""".stripMargin
- runQueryAndCompare(sql1, true,
false)(checkGlutenOperatorMatch[ProjectExecTransformer])
+ runQueryAndCompare(sql1,
true)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
test("test parse_url") {
@@ -622,14 +622,12 @@ class GlutenFunctionValidateSuite extends
GlutenClickHouseWholeStageTransformerS
(ConstantFolding.ruleName + "," + NullPropagation.ruleName)) {
// Test codec with 'US-ASCII'
runQueryAndCompare(
- "SELECT decode(encode('Spark SQL', 'US-ASCII'), 'US-ASCII')",
- noFallBack = false
+ "SELECT decode(encode('Spark SQL', 'US-ASCII'), 'US-ASCII')"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
// Test codec with 'UTF-16'
runQueryAndCompare(
- "SELECT decode(encode('Spark SQL', 'UTF-16'), 'UTF-16')",
- noFallBack = false
+ "SELECT decode(encode('Spark SQL', 'UTF-16'), 'UTF-16')"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -645,8 +643,7 @@ class GlutenFunctionValidateSuite extends
GlutenClickHouseWholeStageTransformerS
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
(ConstantFolding.ruleName + "," + NullPropagation.ruleName)) {
runQueryAndCompare(
- "select cast('7.921901' as float), cast('7.921901' as double)",
- noFallBack = false
+ "select cast('7.921901' as float), cast('7.921901' as double)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -1061,8 +1058,7 @@ class GlutenFunctionValidateSuite extends
GlutenClickHouseWholeStageTransformerS
SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
(ConstantFolding.ruleName + "," + NullPropagation.ruleName)) {
runQueryAndCompare(
- "select cast(' \t2570852431\n' as long), cast('25708\t52431\n' as
long)",
- noFallBack = false
+ "select cast(' \t2570852431\n' as long), cast('25708\t52431\n' as
long)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
index 0c4f865b26..c0dfbed7d1 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenNothingValueCheck.scala
@@ -109,7 +109,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select array() as x union all select array(123) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array: convert nullable to nullable 1") {
@@ -117,7 +117,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select array() as x union all select array(123, null) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array: convert nullable to nullable 2") {
@@ -125,7 +125,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select array() as x union all select array(null) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array: null array") {
@@ -133,7 +133,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select array(null)
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing map: convert nullable to non-nullable") {
@@ -141,7 +141,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select map() as x union all select map(123, 456) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing map: convert nullable to nullable 1") {
@@ -149,7 +149,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select map() as x union all select map(1, null, 2, 23) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array in map 1") {
@@ -157,7 +157,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select map(1, null) as x union all select map(1, array(456)) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array in map 2") {
@@ -165,7 +165,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select map(1, array()) as x union all select map(1, array(456)) as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array in map 3") {
@@ -173,7 +173,7 @@ class GlutenNothingValueCheck extends
GlutenClickHouseWholeStageTransformerSuite
"""
|select map(1, array()) as x union all select map(1, array(456, null))
as x
|""".stripMargin
- compareResultsAgainstVanillaSpark(sql, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
test("nothing array in shuffle") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
index 907a5323a3..38d45db098 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala
@@ -187,8 +187,8 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
}
test("array decimal32 CH column to row") {
- compareResultsAgainstVanillaSpark("SELECT array(1.0, 2.0)", true, { _ =>
}, false)
- compareResultsAgainstVanillaSpark("SELECT map(1.0, '2', 3.0, '4')", true,
{ _ => }, false)
+ compareResultsAgainstVanillaSpark("SELECT array(1.0, 2.0)", true, { _ => })
+ compareResultsAgainstVanillaSpark("SELECT map(1.0, '2', 3.0, '4')", true,
{ _ => })
}
test("array decimal32 spark row to CH column") {
@@ -281,8 +281,7 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
"""
|select cast(map(1,'2') as string)
|""".stripMargin,
- true,
- false
+ true
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -443,7 +442,7 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
{ _ => }
)
val q = "select cast(a as string) from (select array('123',NULL) as a)"
- compareResultsAgainstVanillaSpark(q, true, { _ => }, false)
+ compareResultsAgainstVanillaSpark(q, true, { _ => })
}
}
@@ -505,8 +504,7 @@ class GlutenClickhouseFunctionSuite extends
GlutenClickHouseTPCHAbstractSuite {
|cast(map(array(1), map("aa", "123\'")) as string),
|cast(named_struct("a", "test\'", "b", 1) as string),
|cast(named_struct("a", "test\'", "b", 1, "c", struct("\'test"),
"d", array('123\'')) as string)
- |""".stripMargin,
- noFallBack = false
+ |""".stripMargin
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 18db7e4070..19544e7f0a 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -422,13 +422,11 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(
"select a from (select array_intersect(array(null,1,2,3,null),
array(3,5,1,null,null)) as arr) " +
- "lateral view explode(arr) as a order by a",
- noFallBack = false
+ "lateral view explode(arr) as a order by a"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
- "select array_intersect(array(null,1,2,3,null), cast(null as
array<int>))",
- noFallBack = false
+ "select array_intersect(array(null,1,2,3,null), cast(null as
array<int>))"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
@@ -449,8 +447,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(
"select array_position(array(1,2,3,null), 1),
array_position(array(1,2,3,null), null)," +
"array_position(array(1,2,3,null), 5), array_position(array(1,2,3),
5), " +
- "array_position(array(1,2,3), 2), array_position(cast(null as
array<int>), 1)",
- noFallBack = false
+ "array_position(array(1,2,3), 2), array_position(cast(null as
array<int>), 1)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -465,8 +462,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(
"select array_contains(array(1,2,3,null), 1),
array_contains(array(1,2,3,null), " +
"cast(null as int)), array_contains(array(1,2,3,null), 5),
array_contains(array(1,2,3), 5)," +
- "array_contains(array(1,2,3), 2), array_contains(cast(null as
array<int>), 1)",
- noFallBack = false
+ "array_contains(array(1,2,3), 2), array_contains(cast(null as
array<int>), 1)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -483,8 +479,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
- "select sort_array(array(1,3,2,null)),
sort_array(array(1,2,3,null),false)",
- noFallBack = false
+ "select sort_array(array(1,3,2,null)),
sort_array(array(1,2,3,null),false)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -535,8 +530,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + ","
+ NullPropagation.ruleName)) {
runQueryAndCompare(
"select find_in_set(null, 'a'), find_in_set('a', null), " +
- "find_in_set('a', 'a,b'), find_in_set('a', 'ab,ab')",
- noFallBack = false
+ "find_in_set('a', 'a,b'), find_in_set('a', 'ab,ab')"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -587,8 +581,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
"select elt(2, n_comment, n_regionkey) from nation"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
- "select elt(null, 'a', 'b'), elt(0, 'a', 'b'), elt(1, 'a', 'b'),
elt(3, 'a', 'b')",
- noFallBack = false
+ "select elt(null, 'a', 'b'), elt(0, 'a', 'b'), elt(1, 'a', 'b'),
elt(3, 'a', 'b')"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -601,8 +594,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
"select array_max(null), array_max(array(null)), array_max(array(1, 2,
3, null)), " +
- "array_max(array(1.0, 2.0, 3.0, null)), array_max(array('z', 't',
'abc'))",
- noFallBack = false
+ "array_max(array(1.0, 2.0, 3.0, null)), array_max(array('z', 't',
'abc'))"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -615,8 +607,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
"select array_min(null), array_min(array(null)), array_min(array(1, 2,
3, null)), " +
- "array_min(array(1.0, 2.0, 3.0, null)), array_min(array('z', 't',
'abc'))",
- noFallBack = false
+ "array_min(array(1.0, 2.0, 3.0, null)), array_min(array('z', 't',
'abc'))"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -664,8 +655,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(
"select array_distinct(array(1,2,1,2,3)),
array_distinct(array(null,1,null,1,2,null,3)), " +
- "array_distinct(array(array(1,null,2), array(1,null,2))),
array_distinct(null), array_distinct(array(null))",
- noFallBack = false
+ "array_distinct(array(array(1,null,2), array(1,null,2))),
array_distinct(null), array_distinct(array(null))"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -682,8 +672,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
"array_union(array(null,1,null,1,2,null,3),
array(1,null,2,null,3,null,4)), " +
"array_union(array(array(1,null,2), array(2,null,3)),
array(array(2,null,3), array(1,null,2))), " +
"array_union(array(null), array(null)), " +
- "array_union(cast(null as array<int>), cast(null as array<int>))",
- noFallBack = false
+ "array_union(cast(null as array<int>), cast(null as array<int>))"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -698,8 +687,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(
"select shuffle(array(1,2,3,4,5)), shuffle(array(1,3,null,3,4)),
shuffle(null)",
- compareResult = false,
- noFallBack = false
+ compareResult = false
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
@@ -1235,8 +1223,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + ","
+ NullPropagation.ruleName)) {
runQueryAndCompare(
"select sequence(null, 1), sequence(1, null), sequence(1, 3, null),
sequence(1, 5)," +
- "sequence(5, 1), sequence(1, 5, 2), sequence(5, 1, -2)",
- noFallBack = false
+ "sequence(5, 1), sequence(1, 5, 2), sequence(5, 1, -2)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
@@ -1935,8 +1922,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
runQueryAndCompare(
"select concat_ws(null), concat_ws('-'), concat_ws('-', null),
concat_ws('-', null, null), " +
"concat_ws(null, 'a'), concat_ws('-', 'a'), concat_ws('-', 'a',
null), " +
- "concat_ws('-', 'a', null, 'b', 'c', null, array(null), array('d',
null), array('f', 'g'))",
- noFallBack = false
+ "concat_ws('-', 'a', null, 'b', 'c', null, array(null), array('d',
null), array('f', 'g'))"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
@@ -2195,8 +2181,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
withSQLConf(
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + ","
+ NullPropagation.ruleName)) {
runQueryAndCompare(
- "select 1/0f, 1/0.0d",
- noFallBack = false
+ "select 1/0f, 1/0.0d"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
@@ -2439,8 +2424,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
withSQLConf(
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + ","
+ NullPropagation.ruleName)) {
runQueryAndCompare(
- "select trunc('2023-12-06', 'MM'), trunc('2023-12-06', 'YEAR'),
trunc('2023-12-06', 'WEEK'), trunc('2023-12-06', 'QUARTER')",
- noFallBack = false
+ "select trunc('2023-12-06', 'MM'), trunc('2023-12-06', 'YEAR'),
trunc('2023-12-06', 'WEEK'), trunc('2023-12-06', 'QUARTER')"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
runQueryAndCompare(
@@ -2465,8 +2449,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
withSQLConf(
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> (ConstantFolding.ruleName + ","
+ NullPropagation.ruleName)) {
runQueryAndCompare(
- "select cast('1' as boolean), cast('t' as boolean), cast('all' as
boolean), cast('f' as boolean)",
- noFallBack = false
+ "select cast('1' as boolean), cast('t' as boolean), cast('all' as
boolean), cast('f' as boolean)"
)(checkGlutenOperatorMatch[ProjectExecTransformer])
}
}
diff --git
a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
index de338ce6f0..d3cf296e56 100644
--- a/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
+++ b/cpp-ch/local-engine/AggregateFunctions/AggregateFunctionDVRoaringBitmap.h
@@ -99,7 +99,6 @@ public:
auto & to_tuple = assert_cast<DB::ColumnTuple &>(to);
auto & cardinality = assert_cast<DB::ColumnInt64
&>(to_tuple.getColumn(0));
auto & last = assert_cast<DB::ColumnInt64 &>(to_tuple.getColumn(1));
- auto a = to_tuple.getColumn(2).getDataType();
auto & bitmap = assert_cast<DB::ColumnString &>(to_tuple.getColumn(2));
this->data(place).insertResultInto(cardinality, last, bitmap);
}
diff --git a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
index a6ce6fd408..18417fb3b7 100644
--- a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
+++ b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
@@ -57,7 +57,7 @@ ALWAYS_INLINE static void writeRowToColumns(const
std::vector<MutableColumnPtr>
{
const StringRef str_ref{spark_row_reader.getStringRef(i)};
if (str_ref.data == nullptr)
- columns[i]->insertData(nullptr, str_ref.size);
+ columns[i]->insertDefault();
else if (!spark_row_reader.isBigEndianInSparkRow(i))
columns[i]->insertData(str_ref.data, str_ref.size);
else
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
index 143fc19ae8..dcead78ce0 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
+++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
@@ -62,10 +62,10 @@ ShuffleReader::~ShuffleReader()
input_stream.reset();
}
-jclass ShuffleReader::input_stream_class = nullptr;
-jmethodID ShuffleReader::input_stream_read = nullptr;
+jclass ShuffleReader::shuffle_input_stream_class = nullptr;
+jmethodID ShuffleReader::shuffle_input_stream_read = nullptr;
-bool ReadBufferFromJavaInputStream::nextImpl()
+bool ReadBufferFromJavaShuffleInputStream::nextImpl()
{
int count = readFromJava();
if (count > 0)
@@ -73,20 +73,20 @@ bool ReadBufferFromJavaInputStream::nextImpl()
return count > 0;
}
-int ReadBufferFromJavaInputStream::readFromJava() const
+int ReadBufferFromJavaShuffleInputStream::readFromJava() const
{
GET_JNIENV(env)
jint count = safeCallIntMethod(
- env, java_in, ShuffleReader::input_stream_read,
reinterpret_cast<jlong>(internal_buffer.begin()), internal_buffer.size());
+ env, java_in, ShuffleReader::shuffle_input_stream_read,
reinterpret_cast<jlong>(internal_buffer.begin()), internal_buffer.size());
CLEAN_JNIENV
return count;
}
-ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject
input_stream) : java_in(input_stream)
+ReadBufferFromJavaShuffleInputStream::ReadBufferFromJavaShuffleInputStream(jobject
input_stream) : java_in(input_stream)
{
}
-ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream()
+ReadBufferFromJavaShuffleInputStream::~ReadBufferFromJavaShuffleInputStream()
{
GET_JNIENV(env)
env->DeleteGlobalRef(java_in);
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.h
b/cpp-ch/local-engine/Shuffle/ShuffleReader.h
index 721a157849..d1d36e176b 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleReader.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.h
@@ -31,7 +31,7 @@ class NativeReader;
namespace local_engine
{
void configureCompressedReadBuffer(DB::CompressedReadBuffer &
compressedReadBuffer);
-class ReadBufferFromJavaInputStream;
+class ReadBufferFromJavaShuffleInputStream;
class ShuffleReader : BlockIterator
{
public:
@@ -39,8 +39,8 @@ public:
std::unique_ptr<DB::ReadBuffer> in_, bool compressed, Int64
max_shuffle_read_rows_, Int64 max_shuffle_read_bytes_);
DB::Block * read();
~ShuffleReader();
- static jclass input_stream_class;
- static jmethodID input_stream_read;
+ static jclass shuffle_input_stream_class;
+ static jmethodID shuffle_input_stream_read;
private:
std::unique_ptr<DB::ReadBuffer> in;
@@ -52,11 +52,11 @@ private:
};
-class ReadBufferFromJavaInputStream final : public
DB::BufferWithOwnMemory<DB::ReadBuffer>
+class ReadBufferFromJavaShuffleInputStream final : public
DB::BufferWithOwnMemory<DB::ReadBuffer>
{
public:
- explicit ReadBufferFromJavaInputStream(jobject input_stream);
- ~ReadBufferFromJavaInputStream() override;
+ explicit ReadBufferFromJavaShuffleInputStream(jobject input_stream);
+ ~ReadBufferFromJavaShuffleInputStream() override;
private:
jobject java_in;
diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
index 7b016eb886..e2f94e7cb7 100644
--- a/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
+++ b/cpp-ch/local-engine/Storages/IO/NativeReader.cpp
@@ -25,17 +25,19 @@
#include <IO/VarInt.h>
#include <Storages/IO/AggregateSerializationUtils.h>
#include <Storages/IO/NativeWriter.h>
+#include <jni/jni_common.h>
#include <Common/Arena.h>
+#include <Common/JNIUtils.h>
namespace DB
{
namespace ErrorCodes
{
- extern const int INCORRECT_INDEX;
- extern const int LOGICAL_ERROR;
- extern const int CANNOT_READ_ALL_DATA;
- extern const int INCORRECT_DATA;
- extern const int TOO_LARGE_ARRAY_SIZE;
+extern const int INCORRECT_INDEX;
+extern const int LOGICAL_ERROR;
+extern const int CANNOT_READ_ALL_DATA;
+extern const int INCORRECT_DATA;
+extern const int TOO_LARGE_ARRAY_SIZE;
}
}
@@ -68,10 +70,8 @@ DB::Block NativeReader::read()
/// Append small blocks into a large one, could reduce memory allocations
overhead.
while (result_block.rows() < max_block_size && result_block.bytes() <
max_block_bytes)
- {
if (!appendNextBlock(result_block))
break;
- }
if (result_block.rows())
{
@@ -85,7 +85,8 @@ DB::Block NativeReader::read()
return result_block;
}
-static void readFixedSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr &
column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
+static void
+readFixedSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t
rows, NativeReader::ColumnParseUtil & column_parse_util)
{
ColumnAggregateFunction & real_column =
typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable());
auto & arena = real_column.createOrGetArena();
@@ -102,7 +103,8 @@ static void readFixedSizeAggregateData(DB::ReadBuffer &in,
DB::ColumnPtr & colum
}
}
-static void readVarSizeAggregateData(DB::ReadBuffer &in, DB::ColumnPtr &
column, size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
+static void
+readVarSizeAggregateData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t
rows, NativeReader::ColumnParseUtil & column_parse_util)
{
ColumnAggregateFunction & real_column =
typeid_cast<ColumnAggregateFunction &>(*column->assumeMutable());
auto & arena = real_column.createOrGetArena();
@@ -119,7 +121,8 @@ static void readVarSizeAggregateData(DB::ReadBuffer &in,
DB::ColumnPtr & column,
}
}
-static void readNormalSimpleData(DB::ReadBuffer &in, DB::ColumnPtr & column,
size_t rows, NativeReader::ColumnParseUtil & column_parse_util)
+static void
+readNormalSimpleData(DB::ReadBuffer & in, DB::ColumnPtr & column, size_t rows,
NativeReader::ColumnParseUtil & column_parse_util)
{
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * {
return ∈ };
@@ -146,7 +149,7 @@ readNormalComplexData(DB::ReadBuffer & in, DB::ColumnPtr &
column, size_t rows,
ISerialization::DeserializeBinaryBulkStatePtr state;
DB::ColumnPtr new_col = column->cloneResized(0);
- column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings,
state ,nullptr);
+ column_parse_util.serializer->deserializeBinaryBulkStatePrefix(settings,
state, nullptr);
column_parse_util.serializer->deserializeBinaryBulkWithMultipleStreams(new_col,
0, rows, settings, state, nullptr);
column->assumeMutable()->insertRangeFrom(*new_col, 0, new_col->size());
}
@@ -201,7 +204,7 @@ DB::Block NativeReader::prepareByFirstBlock()
/// Data
ColumnPtr read_column = column.type->createColumn(*serialization);
- if (rows) /// If no rows, nothing to read.
+ if (rows) /// If no rows, nothing to read.
{
if (is_agg_state_type && agg_opt_column)
{
@@ -272,4 +275,36 @@ bool NativeReader::appendNextBlock(DB::Block &
result_block)
return true;
}
+jclass ReadBufferFromJavaInputStream::input_stream_class = nullptr;
+jmethodID ReadBufferFromJavaInputStream::input_stream_read = nullptr;
+
+bool ReadBufferFromJavaInputStream::nextImpl()
+{
+ int count = readFromJava();
+ if (count > 0)
+ working_buffer.resize(count);
+ return count > 0;
+}
+
+int ReadBufferFromJavaInputStream::readFromJava() const
+{
+ GET_JNIENV(env)
+ jint count = safeCallIntMethod(env, input_stream, input_stream_read,
buffer);
+
+ if (count > 0)
+ env->GetByteArrayRegion(buffer, 0, count, reinterpret_cast<jbyte
*>(internal_buffer.begin()));
+
+ CLEAN_JNIENV
+ return count;
+}
+
+ReadBufferFromJavaInputStream::ReadBufferFromJavaInputStream(jobject
input_stream_, jbyteArray buffer_, const size_t buffer_size_)
+ : input_stream(input_stream_), buffer(buffer_), buffer_size(buffer_size_)
+{
+}
+
+ReadBufferFromJavaInputStream::~ReadBufferFromJavaInputStream()
+{
+}
+
}
diff --git a/cpp-ch/local-engine/Storages/IO/NativeReader.h
b/cpp-ch/local-engine/Storages/IO/NativeReader.h
index 4e85526e7d..bf8246a623 100644
--- a/cpp-ch/local-engine/Storages/IO/NativeReader.h
+++ b/cpp-ch/local-engine/Storages/IO/NativeReader.h
@@ -16,10 +16,12 @@
*/
#pragma once
-#include <Common/PODArray.h>
+#include <jni.h>
#include <Core/Block.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypeAggregateFunction.h>
+#include <IO/BufferWithOwnMemory.h>
+#include <IO/ReadBuffer.h>
namespace local_engine
{
@@ -34,14 +36,13 @@ public:
std::string name;
DB::SerializationPtr serializer = nullptr;
size_t avg_value_size_hint = 0;
-
+
// for aggregate data
size_t aggregate_state_size = 0;
size_t aggregate_state_align = 0;
DB::AggregateFunctionPtr aggregate_function = nullptr;
std::function<void(DB::ReadBuffer &, DB::ColumnPtr &, size_t,
ColumnParseUtil &)> parse;
-
};
NativeReader(
@@ -72,4 +73,21 @@ private:
bool appendNextBlock(DB::Block & result_block);
};
+class ReadBufferFromJavaInputStream final : public
DB::BufferWithOwnMemory<DB::ReadBuffer>
+{
+public:
+ static jclass input_stream_class;
+ static jmethodID input_stream_read;
+
+ explicit ReadBufferFromJavaInputStream(jobject input_stream_, jbyteArray
buffer_, size_t buffer_size_);
+ ~ReadBufferFromJavaInputStream() override;
+
+private:
+ jobject input_stream;
+ size_t buffer_size;
+ jbyteArray buffer;
+ int readFromJava() const;
+ bool nextImpl() override;
+};
+
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
index 926c3ab215..2997e990b5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.cpp
@@ -91,10 +91,10 @@ void DeltaWriter::writeDeletionVector(const DB::Block &
block)
for (size_t row_idx = 0; row_idx < block.rows(); row_idx++)
{
const auto file_path = file_path_columns.column->getDataAt(row_idx);
- auto bitmap = bitmap_columns.column->getDataAt(row_idx);
+ auto bitmap = bitmap_columns.column->getDataAt(row_idx).toString();
auto cardinality = cardinality_src_columns.column->get64(row_idx); //
alisa deletedRowIndexCount
- if (size_of_current_bin > 0 && bitmap.size + size_of_current_bin >
packing_target_size)
+ if (size_of_current_bin > 0 && bitmap.length() + size_of_current_bin >
packing_target_size)
{
write_buffer->finalize();
write_buffer = nullptr;
@@ -120,7 +120,7 @@ void DeltaWriter::writeDeletionVector(const DB::Block &
block)
{
DeltaDVRoaringBitmapArray existing_bitmap
=
deserializeExistingBitmap(existing_path_or_inline_dv, existing_offset,
existing_size_in_bytes, table_path);
- existing_bitmap.merge(bitmap.toString());
+ existing_bitmap.merge(bitmap);
bitmap = existing_bitmap.serialize();
cardinality = existing_bitmap.cardinality();
}
@@ -140,12 +140,13 @@ void DeltaWriter::writeDeletionVector(const DB::Block &
block)
if (!write_buffer)
initBinPackage();
- size_of_current_bin = size_of_current_bin + bitmap.size;
- Int32 bitmap_size = static_cast<Int32>(bitmap.size);
+ Int32 bitmap_size = static_cast<Int32>(bitmap.length());
+ size_of_current_bin = size_of_current_bin + bitmap.length();
+
DB::writeBinaryBigEndian(bitmap_size, *write_buffer);
- write_buffer->write(bitmap.data, bitmap.size);
- Int32 checksum_value = static_cast<Int32>(crc32_z(0L,
reinterpret_cast<const unsigned char *>(bitmap.data), bitmap_size));
+ write_buffer->write(bitmap.c_str(), bitmap_size);
+ Int32 checksum_value = static_cast<Int32>(crc32_z(0L,
reinterpret_cast<const unsigned char *>(bitmap.c_str()), bitmap_size));
DB::writeBinaryBigEndian(checksum_value, *write_buffer);
auto dv_descriptor_field
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
index f6d7372ff4..5a08a33169 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Delta/DeltaWriter.h
@@ -64,7 +64,7 @@ private:
void initBinPackage();
- const DB::ContextPtr & context;
+ DB::ContextPtr context;
const String table_path;
const size_t prefix_length;
const size_t packing_target_size;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index e3ac674360..13b7cf2b54 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -121,7 +121,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
block_stats_class = local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/BlockStats;");
block_stats_constructor = local_engine::GetMethodID(env,
block_stats_class, "<init>", "(JZ)V");
- local_engine::ShuffleReader::input_stream_class
+ local_engine::ShuffleReader::shuffle_input_stream_class
= local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/ShuffleInputStream;");
local_engine::NativeSplitter::iterator_class
= local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/vectorized/IteratorWrapper;");
@@ -134,8 +134,13 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_next
= local_engine::GetMethodID(env,
local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class,
"next", "()[B");
- local_engine::ShuffleReader::input_stream_read
- = local_engine::GetMethodID(env,
local_engine::ShuffleReader::input_stream_class, "read", "(JJ)J");
+ local_engine::ReadBufferFromJavaInputStream::input_stream_class
+ = local_engine::CreateGlobalClassReference(env,
"Ljava/io/InputStream;");
+ local_engine::ReadBufferFromJavaInputStream::input_stream_read
+ = local_engine::GetMethodID(env,
local_engine::ReadBufferFromJavaInputStream::input_stream_class, "read",
"([B)I");
+
+ local_engine::ShuffleReader::shuffle_input_stream_read
+ = local_engine::GetMethodID(env,
local_engine::ShuffleReader::shuffle_input_stream_class, "read", "(JJ)J");
local_engine::NativeSplitter::iterator_has_next
= local_engine::GetMethodID(env,
local_engine::NativeSplitter::iterator_class, "hasNext", "()Z");
@@ -205,7 +210,7 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n
env->DeleteGlobalRef(block_stripes_class);
env->DeleteGlobalRef(split_result_class);
env->DeleteGlobalRef(block_stats_class);
- env->DeleteGlobalRef(local_engine::ShuffleReader::input_stream_class);
+
env->DeleteGlobalRef(local_engine::ShuffleReader::shuffle_input_stream_class);
env->DeleteGlobalRef(local_engine::NativeSplitter::iterator_class);
env->DeleteGlobalRef(local_engine::WriteBufferFromJavaOutputStream::output_stream_class);
env->DeleteGlobalRef(local_engine::SourceFromJavaIter::serialized_record_batch_iterator_class);
@@ -536,12 +541,24 @@
Java_org_apache_gluten_vectorized_CHNativeBlock_nativeBlockStats(JNIEnv * env, j
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
+JNIEXPORT jlong
+Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env,
jobject obj, jlong block_address)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+
+ auto copied_block = block->cloneWithColumns(block->getColumns());
+ auto a = new DB::Block(copied_block);
+ return reinterpret_cast<jlong>(a);
+ LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed,
jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
LOCAL_ENGINE_JNI_METHOD_START
auto * input = env->NewGlobalRef(input_stream);
- auto read_buffer =
std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input);
+ auto read_buffer =
std::make_unique<local_engine::ReadBufferFromJavaShuffleInputStream>(input);
auto * shuffle_reader
= new local_engine::ShuffleReader(std::move(read_buffer), compressed,
max_shuffle_read_rows, max_shuffle_read_bytes);
return reinterpret_cast<jlong>(shuffle_reader);
@@ -557,6 +574,19 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHStreamReader_nativeNext(JNIE
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
+JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHStreamReader_directRead(
+ JNIEnv * env, jclass /*clazz*/, jobject input_stream, jbyteArray buffer,
jint buffer_size)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ // auto * input = env->NewGlobalRef(input_stream);
+ auto rb =
std::make_unique<local_engine::ReadBufferFromJavaInputStream>(input_stream,
buffer, buffer_size);
+ auto reader = std::make_unique<local_engine::NativeReader>(*rb);
+ DB::Block block = reader->read();
+ DB::Block * res = new DB::Block(block);
+ return reinterpret_cast<jlong>(res);
+ LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
JNIEXPORT void
Java_org_apache_gluten_vectorized_CHStreamReader_nativeClose(JNIEnv * env,
jobject /*obj*/, jlong shuffle_reader)
{
LOCAL_ENGINE_JNI_METHOD_START
@@ -1192,6 +1222,25 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
+JNIEXPORT long Java_org_apache_gluten_vectorized_BlockOutputStream_directWrite(
+ JNIEnv * env,
+ jclass,
+ jobject output_stream,
+ jbyteArray buffer,
+ jint customize_buffer_size,
+ jlong block_address)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+ auto wb =
std::make_shared<local_engine::WriteBufferFromJavaOutputStream>(output_stream,
buffer, customize_buffer_size);
+ auto native_writer = std::make_unique<local_engine::NativeWriter>(*wb,
block->cloneEmpty());
+ auto write_size = native_writer->write(*block);
+ native_writer->flush();
+ wb->finalize();
+ return write_size;
+ LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
JNIEXPORT void
Java_org_apache_gluten_vectorized_BlockOutputStream_nativeClose(JNIEnv * env,
jobject, jlong instance)
{
LOCAL_ENGINE_JNI_METHOD_START
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
index d23d172e46..1307351a43 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/caller/CallerInfo.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.extension.caller
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.streaming.StreamExecution
/**
* Helper API that stores information about the call site of the columnar
rule. Specific columnar
@@ -28,6 +29,7 @@ import
org.apache.spark.sql.execution.columnar.InMemoryRelation
trait CallerInfo {
def isAqe(): Boolean
def isCache(): Boolean
+ def isStreaming(): Boolean
}
object CallerInfo {
@@ -36,7 +38,11 @@ object CallerInfo {
override def initialValue(): Option[CallerInfo] = None
}
- private class Impl(override val isAqe: Boolean, override val isCache:
Boolean) extends CallerInfo
+ private class Impl(
+ override val isAqe: Boolean,
+ override val isCache: Boolean,
+ override val isStreaming: Boolean
+ ) extends CallerInfo
/*
* Find the information about the caller that initiated the rule call.
@@ -46,7 +52,10 @@ object CallerInfo {
return localStorage.get().get
}
val stack = Thread.currentThread.getStackTrace
- new Impl(isAqe = inAqeCall(stack), isCache = inCacheCall(stack))
+ new Impl(
+ isAqe = inAqeCall(stack),
+ isCache = inCacheCall(stack),
+ isStreaming = inStreamingCall(stack))
}
private def inAqeCall(stack: Seq[StackTraceElement]): Boolean = {
@@ -57,10 +66,15 @@ object CallerInfo {
stack.exists(_.getClassName.equals(InMemoryRelation.getClass.getName))
}
+ private def inStreamingCall(stack: Seq[StackTraceElement]): Boolean = {
+
stack.exists(_.getClassName.equals(StreamExecution.getClass.getName.split('$').head))
+ }
+
/** For testing only. */
- def withLocalValue[T](isAqe: Boolean, isCache: Boolean)(body: => T): T = {
+ def withLocalValue[T](isAqe: Boolean, isCache: Boolean, isStreaming: Boolean
= false)(
+ body: => T): T = {
val prevValue = localStorage.get()
- val newValue = new Impl(isAqe, isCache)
+ val newValue = new Impl(isAqe, isCache, isStreaming)
localStorage.set(Some(newValue))
try {
body
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index c6672cdbb2..1bb5a255f5 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.hive.HiveUDFTransformer
import org.apache.spark.sql.types.{DecimalType, LongType, NullType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import java.io.{ObjectInputStream, ObjectOutputStream}
import java.lang.{Long => JLong}
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
@@ -727,4 +728,18 @@ trait SparkPlanExecApi {
children: Seq[ExpressionTransformer],
expr: Expression): ExpressionTransformer =
GenericExpressionTransformer(substraitName, children, expr)
+
+ def isSupportRDDScanExec(plan: RDDScanExec): Boolean = false
+
+ def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
+ throw new GlutenNotSupportException("RDDScanExec is not supported")
+
+ def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
+ throw new GlutenNotSupportException("Copying ColumnarBatch is not
supported")
+
+ def serializeColumnarBatch(output: ObjectOutputStream, batch:
ColumnarBatch): Unit =
+ throw new GlutenNotSupportException("Serialize ColumnarBatch is not
supported")
+
+ def deserializeColumnarBatch(input: ObjectInputStream): ColumnarBatch =
+ throw new GlutenNotSupportException("Deserialize ColumnarBatch is not
supported")
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 611b2a8ff7..750fc060cd 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft,
BuildRight, BuildSide
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.CollectLimitExec
+import org.apache.spark.sql.execution.RDDScanTransformer
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -348,6 +349,9 @@ object OffloadOthers {
plan.limit,
plan.child
)
+ case plan: RDDScanExec if
RDDScanTransformer.isSupportRDDScanExec(plan) =>
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ RDDScanTransformer.getRDDScanTransform(plan)
case p if !p.isInstanceOf[GlutenPlan] =>
logDebug(s"Transformation for ${p.getClass} is currently not
supported.")
p
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala
new file mode 100644
index 0000000000..e3fc728477
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RDDScanTransformer.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
UnknownPartitioning}
+
+abstract class RDDScanTransformer(
+ outputAttributes: Seq[Attribute],
+ override val outputPartitioning: Partitioning = UnknownPartitioning(0),
+ override val outputOrdering: Seq[SortOrder]
+) extends ValidatablePlan {
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+ override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ override def output: Seq[Attribute] = {
+ outputAttributes
+ }
+
+ override protected def doExecute()
+ : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+ throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
+ }
+
+ override def children: Seq[SparkPlan] = Seq.empty
+}
+
+object RDDScanTransformer {
+ def isSupportRDDScanExec(plan: RDDScanExec): Boolean =
+ BackendsApiManager.getSparkPlanExecApiInstance.isSupportRDDScanExec(plan)
+
+ def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
+ BackendsApiManager.getSparkPlanExecApiInstance.getRDDScanTransform(plan)
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala
new file mode 100644
index 0000000000..019100234d
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRowEnhancement.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+class FakeRowEnhancement(@transient private val _batch: ColumnarBatch)
+ extends FakeRow(_batch)
+ with Serializable {
+
+ override def copy(): InternalRow = {
+ val copied =
BackendsApiManager.getSparkPlanExecApiInstance.copyColumnarBatch(batch)
+ new FakeRowEnhancement(copied)
+ }
+
+ @throws(classOf[IOException])
+ private def writeObject(output: ObjectOutputStream): Unit = {
+
BackendsApiManager.getSparkPlanExecApiInstance.serializeColumnarBatch(output,
batch)
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(input: ObjectInputStream): Unit = {
+ batch =
BackendsApiManager.getSparkPlanExecApiInstance.deserializeColumnarBatch(input)
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 54b5a34639..d58026b123 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -64,7 +64,7 @@ case class FakeRowAdaptor(child: SparkPlan)
override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
override protected def doExecute(): RDD[InternalRow] = {
- doExecuteColumnar().map(cb => new FakeRow(cb))
+ doExecuteColumnar().map(cb => new FakeRowEnhancement(cb))
}
override def outputOrdering: Seq[SortOrder] = child match {
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 5601c2d321..0e4666c604 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -483,6 +483,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("column stats collection for null columns")
.exclude("store and retrieve column stats in different time zones")
.excludeGlutenTest("store and retrieve column stats in different time
zones")
+ .excludeCH("statistics collection of a table with zero column")
enableSuite[GlutenStringFunctionsSuite]
.exclude("string regex_replace / regex_extract")
.exclude("string overlay function")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 3436be6f85..af8c0ac201 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1788,6 +1788,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.includeCH("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
.excludeCH("filter pushdown - StringContains")
+ .excludeCH("SPARK-36866: filter pushdown - year-month interval")
// avoid Velox compile error
enableSuite(
"org.apache.gluten.execution.parquet.GlutenParquetV1FilterSuite2"
@@ -1809,6 +1810,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.includeCH("SPARK-16371 Do not push down filters when inner name and outer
name are the same")
.exclude("filter pushdown - StringPredicate")
.excludeCH("filter pushdown - StringContains")
+ .excludeCH("SPARK-36866: filter pushdown - year-month interval")
enableSuite[GlutenParquetV1PartitionDiscoverySuite]
.excludeCH("Various partition value types")
.excludeCH("Various inferred partition value types")
@@ -2049,6 +2051,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeCH("store and retrieve column stats in different time zones")
.excludeCH("SPARK-42777: describe column stats (min, max) for
timestamp_ntz column")
.excludeCH("Gluten - store and retrieve column stats in different time
zones")
+ .excludeCH("statistics collection of a table with zero column")
enableSuite[GlutenStringExpressionsSuite]
.excludeCH("StringComparison")
.excludeCH("Substring")
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
index 0ffd832f3b..679a69334a 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
@@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.{CalendarInterval,
UTF8String}
trait IFakeRowAdaptor
-class FakeRow(val batch: ColumnarBatch) extends InternalRow {
+class FakeRow(@transient var batch: ColumnarBatch) extends InternalRow {
override def numFields: Int = throw new UnsupportedOperationException()
override def setNullAt(i: Int): Unit = throw new
UnsupportedOperationException()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]