This is an automated email from the ASF dual-hosted git repository.
kejia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9fe8d9299f [GLUTEN-9801] Only delete the files written by the failed
task when calling the abortTask() method (#9844)
9fe8d9299f is described below
commit 9fe8d9299fa685f18ca7d03e8ee1dcfc14990992
Author: JiaKe <[email protected]>
AuthorDate: Wed Jul 23 09:37:28 2025 +0800
[GLUTEN-9801] Only delete the files written by the failed task when calling
the abortTask() method (#9844)
---
.../backendsapi/velox/VeloxIteratorApi.scala | 2 +-
.../execution/SparkWriteFilesCommitProtocol.scala | 26 +++++++++++++--
.../execution/VeloxColumnarWriteFilesExec.scala | 7 ++--
.../sql/execution/VeloxParquetWriteSuite.scala | 33 ++++++++++++------
cpp/core/compute/Runtime.cc | 7 ++++
cpp/core/compute/Runtime.h | 1 +
cpp/core/jni/JniWrapper.cc | 8 ++++-
cpp/velox/compute/VeloxPlanConverter.cc | 3 +-
cpp/velox/compute/VeloxPlanConverter.h | 1 +
cpp/velox/compute/VeloxRuntime.cc | 9 +++--
cpp/velox/cudf/CudfPlanValidator.cc | 3 +-
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 39 ++++++----------------
cpp/velox/substrait/SubstraitToVeloxPlan.h | 8 ++++-
.../substrait/SubstraitToVeloxPlanValidator.h | 3 +-
.../Substrait2VeloxValuesNodeConversionTest.cc | 2 +-
cpp/velox/tests/VeloxSubstraitRoundTripTest.cc | 5 +--
.../gluten/vectorized/NativePlanEvaluator.java | 5 +--
.../gluten/vectorized/PlanEvaluatorJniWrapper.java | 2 +-
18 files changed, 107 insertions(+), 57 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index c8aa07728f..fa713549fd 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -217,7 +217,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
}
override def injectWriteFilesTempPath(path: String, fileName: String): Unit
= {
- NativePlanEvaluator.injectWriteFilesTempPath(path)
+ NativePlanEvaluator.injectWriteFilesTempPath(path, fileName)
}
/** Generate Iterator[ColumnarBatch] for first stage. */
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 197b57f592..13a9b987f3 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.{FileCommitProtocol,
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.execution.datasources.WriteJobDescription
import org.apache.spark.util.Utils
@@ -28,6 +28,9 @@ import
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import java.lang.reflect.Field
+import java.util.UUID
+
+import scala.collection.mutable
/**
* A wrapper for [[HadoopMapReduceCommitProtocol]]. This class only affects
the task side commit
@@ -49,6 +52,8 @@ class SparkWriteFilesCommitProtocol(
private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
private val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+ private var fileNames: mutable.Set[String] = null
+
// Set up the attempt context required to use in the output committer.
val taskAttemptContext: TaskAttemptContext = {
// Set up the configuration object
@@ -70,10 +75,22 @@ class SparkWriteFilesCommitProtocol(
def setupTask(): Unit = {
committer.setupTask(taskAttemptContext)
+ fileNames = mutable.Set[String]()
}
def getJobId: String = jobId.toString
+ // Copied from `HadoopMapReduceCommitProtocol.getFilename`.
+ def getFilename(spec: FileNameSpec): String = {
+ // The file name looks like
part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
+ // Note that %05d does not truncate the split number, so if we have more
than 100000 tasks,
+ // the file name is fine and won't overflow.
+ val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
+ val fileName =
f"${spec.prefix}part-$split%05d-${UUID.randomUUID().toString()}${spec.suffix}"
+ fileNames += fileName
+ fileName
+ }
+
def newTaskAttemptTempPath(): String = {
assert(internalCommitter != null)
val stagingDir: Path = internalCommitter match {
@@ -100,8 +117,11 @@ class SparkWriteFilesCommitProtocol(
def abortTask(writePath: String): Unit = {
committer.abortTask(taskAttemptContext)
- val tmpPath = new Path(writePath)
- tmpPath.getFileSystem(taskAttemptContext.getConfiguration).delete(tmpPath,
true)
+ // Deletes the files written by current task.
+ for (fileName <- fileNames) {
+ val filePath = new Path(writePath, fileName)
+
filePath.getFileSystem(taskAttemptContext.getConfiguration).delete(filePath,
false)
+ }
}
// Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with
multi-version
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 1641351e6b..84f6134aba 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
-import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
SparkHadoopWriterUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
@@ -195,11 +195,14 @@ class VeloxColumnarWriteFilesRDD(
commitProtocol.setupTask()
val writePath = commitProtocol.newTaskAttemptTempPath()
+ val suffix =
description.outputWriterFactory.getFileExtension(commitProtocol.taskAttemptContext)
+ val fileNameSpec = FileNameSpec("", suffix)
+ val fileName = commitProtocol.getFilename(fileNameSpec)
logDebug(s"Velox staging write path: $writePath")
var writeTaskResult: WriteTaskResult = null
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
"")
+
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath,
fileName)
// Initialize the native plan
val iter = firstParent[ColumnarBatch].iterator(split, context)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index 3b3129090b..81c9870a52 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -33,16 +33,29 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
override protected val fileFormat: String = "parquet"
// The parquet compression codec extensions
- private val parquetCompressionCodecExtensions = Map(
- "none" -> "",
- "uncompressed" -> "",
- "snappy" -> ".snappy",
- "gzip" -> ".gz",
- "lzo" -> ".lzo",
- "lz4" -> ".lz4",
- "brotli" -> ".br",
- "zstd" -> ".zstd"
- )
+ private val parquetCompressionCodecExtensions = if (isSparkVersionGE("3.5"))
{
+ Map(
+ "none" -> "",
+ "uncompressed" -> "",
+ "snappy" -> ".snappy",
+ "gzip" -> ".gz",
+ "lzo" -> ".lzo",
+ "lz4" -> ".lz4hadoop", // Specific extension for version 3.5
+ "brotli" -> ".br",
+ "zstd" -> ".zstd"
+ )
+ } else {
+ Map(
+ "none" -> "",
+ "uncompressed" -> "",
+ "snappy" -> ".snappy",
+ "gzip" -> ".gz",
+ "lzo" -> ".lzo",
+ "lz4" -> ".lz4",
+ "brotli" -> ".br",
+ "zstd" -> ".zstd"
+ )
+ }
private def getParquetFileExtension(codec: String): String = {
s"${parquetCompressionCodecExtensions(codec)}.parquet"
diff --git a/cpp/core/compute/Runtime.cc b/cpp/core/compute/Runtime.cc
index 30b5668000..f5b4ed0f33 100644
--- a/cpp/core/compute/Runtime.cc
+++ b/cpp/core/compute/Runtime.cc
@@ -58,4 +58,11 @@ std::optional<std::string>*
Runtime::localWriteFilesTempPath() {
return &path;
}
+std::optional<std::string>* Runtime::localWriteFileName() {
+ // This is thread-local to conform to Java side ColumnarWriteFilesExec's
design.
+ // FIXME: Pass the path through relevant member functions.
+ static thread_local std::optional<std::string> fileName;
+ return &fileName;
+}
+
} // namespace gluten
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index 00498a9e54..4eda64129a 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -69,6 +69,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
const std::unordered_map<std::string, std::string>& sessionConf = {});
static void release(Runtime*);
static std::optional<std::string>* localWriteFilesTempPath();
+ static std::optional<std::string>* localWriteFileName();
Runtime(
const std::string& kind,
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 0419436ef1..5cca5dd2ac 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -380,12 +380,18 @@ JNIEXPORT jstring JNICALL
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrap
JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_injectWriteFilesTempPath(
// NOLINT
JNIEnv* env,
jclass,
- jbyteArray path) {
+ jbyteArray path,
+ jbyteArray fileName) {
JNI_METHOD_START
auto len = env->GetArrayLength(path);
auto safeArray = getByteArrayElementsSafe(env, path);
std::string pathStr(reinterpret_cast<char*>(safeArray.elems()), len);
*Runtime::localWriteFilesTempPath() = pathStr;
+
+ len = env->GetArrayLength(fileName);
+ auto fileNameArray = getByteArrayElementsSafe(env, fileName);
+ std::string fileNameStr(reinterpret_cast<char*>(fileNameArray.elems()), len);
+ *Runtime::localWriteFileName() = fileNameStr;
JNI_METHOD_END()
}
diff --git a/cpp/velox/compute/VeloxPlanConverter.cc
b/cpp/velox/compute/VeloxPlanConverter.cc
index a3ae60cd0b..5529ffb90a 100644
--- a/cpp/velox/compute/VeloxPlanConverter.cc
+++ b/cpp/velox/compute/VeloxPlanConverter.cc
@@ -33,9 +33,10 @@ VeloxPlanConverter::VeloxPlanConverter(
velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
const std::optional<std::string> writeFilesTempPath,
+ const std::optional<std::string> writeFileName,
bool validationMode)
: validationMode_(validationMode),
- substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath,
validationMode) {
+ substraitVeloxPlanConverter_(veloxPool, veloxCfg, writeFilesTempPath,
writeFileName, validationMode) {
substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}
diff --git a/cpp/velox/compute/VeloxPlanConverter.h
b/cpp/velox/compute/VeloxPlanConverter.h
index 2528a46914..7a14693cb7 100644
--- a/cpp/velox/compute/VeloxPlanConverter.h
+++ b/cpp/velox/compute/VeloxPlanConverter.h
@@ -34,6 +34,7 @@ class VeloxPlanConverter {
facebook::velox::memory::MemoryPool* veloxPool,
const facebook::velox::config::ConfigBase* veloxCfg,
const std::optional<std::string> writeFilesTempPath = std::nullopt,
+ const std::optional<std::string> writeFileName = std::nullopt,
bool validationMode = false);
std::shared_ptr<const facebook::velox::core::PlanNode> toVeloxPlan(
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index a45151168f..c80306e64b 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -138,7 +138,8 @@ void VeloxRuntime::getInfoAndIds(
std::string VeloxRuntime::planString(bool details, const
std::unordered_map<std::string, std::string>& sessionConf) {
std::vector<std::shared_ptr<ResultIterator>> inputs;
auto veloxMemoryPool = gluten::defaultLeafVeloxMemoryPool();
- VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(),
veloxCfg_.get(), std::nullopt, true);
+ VeloxPlanConverter veloxPlanConverter(
+ inputs, veloxMemoryPool.get(), veloxCfg_.get(), std::nullopt,
std::nullopt, true);
auto veloxPlan = veloxPlanConverter.toVeloxPlan(substraitPlan_, localFiles_);
return veloxPlan->toString(details, true);
}
@@ -156,7 +157,11 @@ std::shared_ptr<ResultIterator>
VeloxRuntime::createResultIterator(
LOG_IF(INFO, debugModeEnabled_) << "VeloxRuntime session config:" <<
printConfig(confMap_);
VeloxPlanConverter veloxPlanConverter(
- inputs, memoryManager()->getLeafMemoryPool().get(), veloxCfg_.get(),
*localWriteFilesTempPath());
+ inputs,
+ memoryManager()->getLeafMemoryPool().get(),
+ veloxCfg_.get(),
+ *localWriteFilesTempPath(),
+ *localWriteFileName());
veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_,
std::move(localFiles_));
LOG_IF(INFO, debugModeEnabled_ && taskInfo_.has_value())
<< "############### Velox plan for task " << taskInfo_.value() << "
###############" << std::endl
diff --git a/cpp/velox/cudf/CudfPlanValidator.cc
b/cpp/velox/cudf/CudfPlanValidator.cc
index 177613e8cc..f1a58577f6 100644
--- a/cpp/velox/cudf/CudfPlanValidator.cc
+++ b/cpp/velox/cudf/CudfPlanValidator.cc
@@ -35,7 +35,8 @@ bool CudfPlanValidator::validate(const ::substrait::Plan&
substraitPlan) {
std::vector<std::shared_ptr<ResultIterator>> inputs;
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
- VeloxPlanConverter veloxPlanConverter(inputs, veloxMemoryPool.get(),
veloxCfg.get(), std::nullopt, true);
+ VeloxPlanConverter veloxPlanConverter(
+ inputs, veloxMemoryPool.get(), veloxCfg.get(), std::nullopt,
std::nullopt, true);
auto planNode = veloxPlanConverter.toVeloxPlan(substraitPlan, localFiles);
std::unordered_set<velox::core::PlanNodeId> emptySet;
velox::core::PlanFragment planFragment{planNode,
velox::core::ExecutionStrategy::kUngrouped, 1, emptySet};
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 72cc7d88e9..8db52ab6eb 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -17,8 +17,6 @@
#include "SubstraitToVeloxPlan.h"
-#include "utils/StringUtil.h"
-
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "operators/plannodes/RowVectorStream.h"
@@ -506,32 +504,9 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
}
}
-std::string makeUuid() {
- return generateUuid();
-}
-
-std::string compressionFileNameSuffix(common::CompressionKind kind) {
- switch (static_cast<int32_t>(kind)) {
- case common::CompressionKind_ZLIB:
- return ".zlib";
- case common::CompressionKind_SNAPPY:
- return ".snappy";
- case common::CompressionKind_LZO:
- return ".lzo";
- case common::CompressionKind_ZSTD:
- return ".zstd";
- case common::CompressionKind_LZ4:
- return ".lz4";
- case common::CompressionKind_GZIP:
- return ".gz";
- case common::CompressionKind_NONE:
- default:
- return "";
- }
-}
-
std::shared_ptr<connector::hive::LocationHandle> makeLocationHandle(
const std::string& targetDirectory,
+ const std::string& fileName,
dwio::common::FileFormat fileFormat,
common::CompressionKind compression,
const bool& isBucketed,
@@ -540,7 +515,7 @@ std::shared_ptr<connector::hive::LocationHandle>
makeLocationHandle(
connector::hive::LocationHandle::TableType::kExisting) {
std::string targetFileName = "";
if (fileFormat == dwio::common::FileFormat::PARQUET && !isBucketed) {
- targetFileName = fmt::format("gluten-part-{}{}{}", makeUuid(),
compressionFileNameSuffix(compression), ".parquet");
+ targetFileName = fileName;
}
return std::make_shared<connector::hive::LocationHandle>(
targetDirectory, writeDirectory.value_or(targetDirectory), tableType,
targetFileName);
@@ -672,6 +647,14 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
writePath = "";
}
+ std::string fileName;
+ if (writeFileName_.has_value()) {
+ fileName = writeFileName_.value();
+ } else {
+ VELOX_CHECK(validationMode_, "WriteRel should have the write path before
initializing the plan.");
+ fileName = "";
+ }
+
GLUTEN_CHECK(writeRel.named_table().has_advanced_extension(), "Advanced
extension not found in WriteRel");
const auto& ext = writeRel.named_table().advanced_extension();
GLUTEN_CHECK(ext.has_optimization(), "Extension optimization not found in
WriteRel");
@@ -706,7 +689,7 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
inputType->children(),
partitionedKey,
bucketProperty,
- makeLocationHandle(writePath, fileFormat, compressionKind,
bucketProperty != nullptr),
+ makeLocationHandle(writePath, fileName, fileFormat,
compressionKind, bucketProperty != nullptr),
writerOptions,
fileFormat,
compressionKind)),
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h
b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 3d90b6ebe3..1d816da612 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -66,8 +66,13 @@ class SubstraitToVeloxPlanConverter {
memory::MemoryPool* pool,
const facebook::velox::config::ConfigBase* veloxCfg,
const std::optional<std::string> writeFilesTempPath = std::nullopt,
+ const std::optional<std::string> writeFileName = std::nullopt,
bool validationMode = false)
- : pool_(pool), veloxCfg_(veloxCfg),
writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {
+ : pool_(pool),
+ veloxCfg_(veloxCfg),
+ writeFilesTempPath_(writeFilesTempPath),
+ writeFileName_(writeFileName),
+ validationMode_(validationMode) {
VELOX_USER_CHECK_NOT_NULL(veloxCfg_);
}
@@ -284,6 +289,7 @@ class SubstraitToVeloxPlanConverter {
/// The temporary path used to write files.
std::optional<std::string> writeFilesTempPath_;
+ std::optional<std::string> writeFileName_;
/// A flag used to specify validation.
bool validationMode_ = false;
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
index 370b9501df..122a6b7d4a 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
@@ -32,7 +32,8 @@ class SubstraitToVeloxPlanValidator {
std::unordered_map<std::string, std::string> configs{
{velox::core::QueryConfig::kSparkPartitionId, "0"},
{velox::core::QueryConfig::kSessionTimezone, "GMT"}};
veloxCfg_ =
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
- planConverter_ = std::make_unique<SubstraitToVeloxPlanConverter>(pool,
veloxCfg_.get(), std::nullopt, true);
+ planConverter_ =
+ std::make_unique<SubstraitToVeloxPlanConverter>(pool, veloxCfg_.get(),
std::nullopt, std::nullopt, true);
queryCtx_ = velox::core::QueryCtx::create(nullptr,
velox::core::QueryConfig(veloxCfg_->rawConfigs()));
// An execution context used for function validation.
execCtx_ = std::make_unique<velox::core::ExecCtx>(pool, queryCtx_.get());
diff --git a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
index ef8dd8cd03..0a2b409526 100644
--- a/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
+++ b/cpp/velox/tests/Substrait2VeloxValuesNodeConversionTest.cc
@@ -43,7 +43,7 @@ TEST_F(Substrait2VeloxValuesNodeConversionTest, valuesNode) {
JsonToProtoConverter::readFromFile(planPath, substraitPlan);
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<SubstraitToVeloxPlanConverter> planConverter_ =
- std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, true);
+ std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, std::nullopt, true);
auto veloxPlan = planConverter_->toVeloxPlan(substraitPlan);
RowVectorPtr expectedData = makeRowVector(
diff --git a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
index 60cafd1c6f..8eeb2818ce 100644
--- a/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
+++ b/cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
@@ -71,7 +71,7 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
- std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, true);
+ std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, std::nullopt, true);
// Convert Substrait Plan to the same Velox Plan.
auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
@@ -92,7 +92,8 @@ class VeloxSubstraitRoundTripTest : public OperatorTestBase {
auto veloxCfg =
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>());
std::shared_ptr<SubstraitToVeloxPlanConverter> substraitConverter_ =
- std::make_shared<SubstraitToVeloxPlanConverter>(pool_.get(),
veloxCfg.get(), std::nullopt, true);
+ std::make_shared<SubstraitToVeloxPlanConverter>(
+ pool_.get(), veloxCfg.get(), std::nullopt, std::nullopt, true);
// Convert Substrait Plan to the same Velox Plan.
auto samePlan = substraitConverter_->toVeloxPlan(substraitPlan);
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
index 44d4107c56..b3889fb231 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java
@@ -54,8 +54,9 @@ public class NativePlanEvaluator {
return jniWrapper.nativeValidateWithFailureReason(subPlan);
}
- public static void injectWriteFilesTempPath(String path) {
-
PlanEvaluatorJniWrapper.injectWriteFilesTempPath(path.getBytes(StandardCharsets.UTF_8));
+ public static void injectWriteFilesTempPath(String path, String fileName) {
+ PlanEvaluatorJniWrapper.injectWriteFilesTempPath(
+ path.getBytes(StandardCharsets.UTF_8),
fileName.getBytes(StandardCharsets.UTF_8));
}
// Used by WholeStageTransform to create the native computing pipeline and
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
index 39a543422d..502bfdbcaf 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java
@@ -41,7 +41,7 @@ public class PlanEvaluatorJniWrapper implements RuntimeAware {
return runtime.getHandle();
}
- public static native void injectWriteFilesTempPath(byte[] path);
+ public static native void injectWriteFilesTempPath(byte[] path, byte[]
fileName);
/**
* Validate the Substrait plan in native compute engine.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]