This is an automated email from the ASF dual-hosted git repository.
zhouyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new af47c07982 [GLUTEN-6887][VL] Daily Update Velox Version (2026_06_27)
(#12385)
af47c07982 is described below
commit af47c079820aa9168429312268b08213ff5c3018
Author: Gluten Performance Bot
<[email protected]>
AuthorDate: Tue Jun 30 21:46:08 2026 +0100
[GLUTEN-6887][VL] Daily Update Velox Version (2026_06_27) (#12385)
* [GLUTEN-6887][VL] Daily Update Velox Version (dft-2026_06_27)
Upstream Velox's New Commits:
14de1ea1a by Zac Wen, fix: Reject nullAllowed filters in index lookup
extraction (#17928)
72b003f55 by Ping Liu, docs: Refactor hive connector config properties
(#17872)
a49069bf8 by Hazmi, feat(iceberg): Support customize file name generator
for IcebergInsertTableHandle (#17437)
8fc595046 by Masha Basmanova, refactor(test): Build DuckDbQueryRunner's
database lazily (#17939)
9e63ccfe0 by Zac Wen, fix: Handle exclusive floating-point bounds in
extractRangeBounds (#17916)
f259ed2de by sungwoo-XCENA, build(cxl): Add the VELOX_ENABLE_CXL build flag
(#17935)
83a17d6fd by Jimmy Lu, build: Add BloomFilter.cpp to velox_common_base
CMake sources (#17946)
f13ddcb8c by Hongze Zhang, feat(spark): Use BloomFilterView in
BloomFilterMightContainFunction to avoid copy (#16609)
Signed-off-by: glutenperfbot <[email protected]>
* fix compile
Signed-off-by: Yuan Zhou <[email protected]>
* [VL] Refactor Gluten to use upstream Velox
* fix
Signed-off-by: Yuan <[email protected]>
* Temp branch
* Fix
* Fixed iceberg java support
* fix
Signed-off-by: Yuan <[email protected]>
* fix gpu build
Signed-off-by: Yuan <[email protected]>
* remove run_setupscripts and arrow build
Signed-off-by: Yuan <[email protected]>
* Fix codestyle
* fix
Signed-off-by: Yuan <[email protected]>
---------
Signed-off-by: glutenperfbot <[email protected]>
Signed-off-by: Yuan Zhou <[email protected]>
Signed-off-by: Yuan <[email protected]>
Co-authored-by: glutenperfbot <[email protected]>
Co-authored-by: Yuan Zhou <[email protected]>
Co-authored-by: Hazmi <[email protected]>
Co-authored-by: Yuan <[email protected]>
Co-authored-by: Rui Mo <[email protected]>
---
.github/workflows/velox_backend_x86.yml | 2 +-
.../execution/enhanced/VeloxIcebergSuite.scala | 3 +-
.../apache/gluten/utils/VeloxBloomFilterTest.java | 20 +++----
cpp/velox/CMakeLists.txt | 6 +-
cpp/velox/compute/VeloxBackend.cc | 8 +++
cpp/velox/compute/VeloxBackend.h | 4 ++
cpp/velox/compute/VeloxConnectorIds.h | 2 +
cpp/velox/compute/VeloxRuntime.cc | 15 ++++-
cpp/velox/compute/VeloxRuntime.h | 7 ---
cpp/velox/compute/WholeStageResultIterator.cc | 3 +-
cpp/velox/compute/iceberg/IcebergWriter.cc | 66 +++++++++++++---------
cpp/velox/compute/iceberg/IcebergWriter.h | 5 +-
cpp/velox/config/VeloxConfig.h | 2 +
cpp/velox/jni/VeloxJniWrapper.cc | 10 ----
.../operators/writer/VeloxColumnarBatchWriter.cc | 10 +++-
cpp/velox/substrait/SubstraitToVeloxPlan.cc | 13 +++--
.../substrait/SubstraitToVeloxPlanValidator.h | 6 +-
cpp/velox/utils/ConfigExtractor.cc | 17 ++++--
cpp/velox/utils/GpuBufferBatchResizer.cc | 2 +-
cpp/velox/utils/VeloxWriterUtils.cc | 28 +++++----
cpp/velox/utils/VeloxWriterUtils.h | 2 +-
dev/ci-velox-buildshared-centos-9.sh | 2 +-
ep/build-velox/src/get-velox.sh | 4 +-
.../iceberg/spark/source/IcebergWriteUtil.scala | 16 ++++--
24 files changed, 153 insertions(+), 100 deletions(-)
diff --git a/.github/workflows/velox_backend_x86.yml
b/.github/workflows/velox_backend_x86.yml
index ad3cf96ecf..3b97c86b3a 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -1178,7 +1178,7 @@ jobs:
mkdir -p /work/.ccache
cd /work
- bash dev/builddeps-veloxbe.sh --run_setup_script=ON
--build_arrow=OFF --build_tests=ON --build_benchmarks=ON --enable_gpu=ON
+ bash dev/builddeps-veloxbe.sh --run_setup_script=OFF
--build_arrow=OFF --build_tests=ON --build_benchmarks=ON --enable_gpu=ON
rm -rf ep/build-velox/build/velox_ep
build/mvn clean package -Pbackends-velox -Pspark-3.4 -DskipTests
ccache -s
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index ddd57dca46..0ec53f4bab 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -327,8 +327,7 @@ class VeloxIcebergSuite extends IcebergSuite {
val lastExecId = statusStore.executionsList().last.executionId
val executionMetrics = statusStore.executionMetrics(lastExecId)
- // TODO: fix https://github.com/apache/gluten/issues/11510
- assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 0)
+ assert(executionMetrics(metrics("numWrittenFiles").id).toLong == 1)
}
}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
index 351b97ee11..f8cafc7908 100644
---
a/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/utils/VeloxBloomFilterTest.java
@@ -47,16 +47,16 @@ public class VeloxBloomFilterTest extends
VeloxBackendTestBase {
buf.putInt(0); // size
TaskResources$.MODULE$.runUnsafe(
() -> {
- final BloomFilter filter = VeloxBloomFilter.readFrom(buf.array());
- Assert.assertThrows(
- "Bloom-filter is not initialized",
- RuntimeException.class,
- new ThrowingRunnable() {
- @Override
- public void run() throws Throwable {
- filter.mightContainLong(0);
- }
- });
+ RuntimeException exception =
+ Assert.assertThrows(
+ RuntimeException.class,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ VeloxBloomFilter.readFrom(buf.array());
+ }
+ });
+ Assert.assertTrue(exception.getMessage().contains("Invalid
BloomFilter size: 0"));
return null;
});
}
diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index 4ec3f9f0f2..eea15ed301 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -230,10 +230,8 @@ if(ENABLE_GPU)
memory/GpuBufferColumnarBatch.cc)
endif()
-if(ENABLE_ENHANCED_FEATURES)
- list(APPEND VELOX_SRCS compute/iceberg/IcebergFormat.cc
- compute/iceberg/IcebergWriter.cc)
-endif()
+list(APPEND VELOX_SRCS compute/iceberg/IcebergFormat.cc
+ compute/iceberg/IcebergWriter.cc)
if(BUILD_TESTS OR BUILD_BENCHMARKS)
set(BUILD_TEST_UTILS ON)
diff --git a/cpp/velox/compute/VeloxBackend.cc
b/cpp/velox/compute/VeloxBackend.cc
index aa77d0ffe7..4ba7dd7a73 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -56,6 +56,7 @@
#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
+#include "velox/connectors/hive/iceberg/IcebergConnector.h"
#include
"velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" //
@manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h"
// @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
@@ -380,6 +381,13 @@ std::shared_ptr<facebook::velox::connector::Connector>
VeloxBackend::createDelta
return std::make_shared<delta::DeltaConnector>(connectorId,
hiveConnectorConfig_, ioExecutor);
}
+std::shared_ptr<facebook::velox::connector::Connector>
VeloxBackend::createIcebergConnector(
+ const std::string& connectorId,
+ folly::Executor* ioExecutor) const {
+ return std::make_shared<velox::connector::hive::iceberg::IcebergConnector>(
+ connectorId, hiveConnectorConfig_, ioExecutor);
+}
+
std::shared_ptr<facebook::velox::connector::Connector>
VeloxBackend::createValueStreamConnector(
const std::string& connectorId,
bool dynamicFilterEnabled) const {
diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h
index 2796ce20c3..5597ca67e6 100644
--- a/cpp/velox/compute/VeloxBackend.h
+++ b/cpp/velox/compute/VeloxBackend.h
@@ -74,6 +74,10 @@ class VeloxBackend {
const std::string& connectorId,
folly::Executor* ioExecutor) const;
+ std::shared_ptr<facebook::velox::connector::Connector>
createIcebergConnector(
+ const std::string& connectorId,
+ folly::Executor* ioExecutor) const;
+
std::shared_ptr<facebook::velox::connector::Connector> createDeltaConnector(
const std::string& connectorId,
folly::Executor* ioExecutor) const;
diff --git a/cpp/velox/compute/VeloxConnectorIds.h
b/cpp/velox/compute/VeloxConnectorIds.h
index a0e37ba8b6..73b526229f 100644
--- a/cpp/velox/compute/VeloxConnectorIds.h
+++ b/cpp/velox/compute/VeloxConnectorIds.h
@@ -23,10 +23,12 @@ namespace gluten {
struct VeloxConnectorIds {
std::string hive;
+ std::string iceberg;
std::string delta;
std::string iterator;
std::string cudfHive;
bool hiveRegistered{false};
+ bool icebergRegistered{false};
bool deltaRegistered{false};
bool iteratorRegistered{false};
bool cudfHiveRegistered{false};
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index 237048553a..031b13ab5a 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -227,6 +227,7 @@ std::string makeScopedConnectorId(const std::string& base,
uint64_t runtimeId) {
VeloxConnectorIds makeScopedConnectorIds(uint64_t runtimeId) {
return VeloxConnectorIds{
.hive = makeScopedConnectorId(kHiveConnectorId, runtimeId),
+ .iceberg = makeScopedConnectorId(kIcebergConnectorId, runtimeId),
.delta =
makeScopedConnectorId(delta::DeltaConnectorFactory::kDeltaConnectorName,
runtimeId),
.iterator = makeScopedConnectorId(kIteratorConnectorId, runtimeId),
.cudfHive = makeScopedConnectorId(kCudfHiveConnectorId, runtimeId)};
@@ -290,6 +291,14 @@ void VeloxRuntime::registerConnectors() {
velox::connector::hasConnector(connectorIds_.hive),
"Scoped hive connector not found after registration: " +
connectorIds_.hive);
+ connectorIds_.icebergRegistered =
+
velox::connector::registerConnector(backend->createIcebergConnector(connectorIds_.iceberg,
ioExecutor_.get()));
+ GLUTEN_CHECK(
+ connectorIds_.icebergRegistered, "Failed to register scoped Iceberg
connector: " + connectorIds_.iceberg);
+ GLUTEN_CHECK(
+ velox::connector::hasConnector(connectorIds_.iceberg),
+ "Scoped Iceberg connector not found after registration: " +
connectorIds_.iceberg);
+
connectorIds_.deltaRegistered =
velox::connector::registerConnector(backend->createDeltaConnector(connectorIds_.delta,
ioExecutor_.get()));
GLUTEN_CHECK(connectorIds_.deltaRegistered, "Failed to register scoped delta
connector: " + connectorIds_.delta);
@@ -340,6 +349,10 @@ void VeloxRuntime::unregisterConnectors() {
velox::connector::unregisterConnector(connectorIds_.hive);
connectorIds_.hiveRegistered = false;
}
+ if (connectorIds_.icebergRegistered) {
+ velox::connector::unregisterConnector(connectorIds_.iceberg);
+ connectorIds_.icebergRegistered = false;
+ }
}
void VeloxRuntime::parsePlan(const uint8_t* data, int32_t size) {
@@ -518,7 +531,6 @@ std::shared_ptr<RowToColumnarConverter>
VeloxRuntime::createRow2ColumnarConverte
return std::make_shared<VeloxRowToColumnarConverter>(cSchema, veloxPool);
}
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
std::shared_ptr<IcebergWriter> VeloxRuntime::createIcebergWriter(
RowTypePtr rowType,
int32_t format,
@@ -546,7 +558,6 @@ std::shared_ptr<IcebergWriter>
VeloxRuntime::createIcebergWriter(
veloxPool,
connectorPool);
}
-#endif
std::shared_ptr<ShuffleWriter> VeloxRuntime::createShuffleWriter(
int32_t numPartitions,
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index c6ee1c462c..3f53653e5b 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -20,9 +20,7 @@
#include "WholeStageResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxConnectorIds.h"
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "iceberg/IcebergWriter.h"
-#endif
#include <folly/Executor.h>
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
@@ -30,10 +28,7 @@
#include "operators/writer/VeloxParquetDataSource.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
-
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "IcebergNestedField.pb.h"
-#endif
namespace gluten {
@@ -77,7 +72,6 @@ class VeloxRuntime final : public Runtime {
std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct
ArrowSchema* cSchema) override;
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
std::shared_ptr<IcebergWriter> createIcebergWriter(
RowTypePtr rowType,
int32_t format,
@@ -89,7 +83,6 @@ class VeloxRuntime final : public Runtime {
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs);
-#endif
std::shared_ptr<ShuffleWriter> createShuffleWriter(
int numPartitions,
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index b3d9f480c1..29213c2e0e 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -202,7 +202,7 @@ WholeStageResultIterator::WholeStageResultIterator(
std::unordered_map<std::string, std::string>
customSplitInfo{{"table_format", "hive-iceberg"}};
auto deleteFiles = icebergSplitInfo->deleteFilesVec[idx];
split =
std::make_shared<velox::connector::hive::iceberg::HiveIcebergSplit>(
- connectorIds_.hive,
+ connectorIds_.iceberg,
paths[idx],
format,
starts[idx],
@@ -284,6 +284,7 @@ std::shared_ptr<velox::core::QueryCtx>
WholeStageResultIterator::createNewVeloxQ
std::unordered_map<std::string, std::shared_ptr<velox::config::ConfigBase>>
connectorConfigs;
auto hiveSessionConfig = createHiveConnectorSessionConfig(veloxCfg_);
connectorConfigs[connectorIds_.hive] = hiveSessionConfig;
+ connectorConfigs[connectorIds_.iceberg] = hiveSessionConfig;
connectorConfigs[connectorIds_.delta] = hiveSessionConfig;
connectorConfigs[connectorIds_.iterator] = hiveSessionConfig;
#ifdef GLUTEN_ENABLE_GPU
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index e248fe4d27..ea838ec9d1 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -17,8 +17,10 @@
#include "IcebergWriter.h"
+#include "IcebergNestedField.pb.h"
#include "IcebergPartitionSpec.pb.h"
#include "compute/ProtobufUtils.h"
+#include "compute/VeloxBackend.h"
#include "compute/iceberg/IcebergFormat.h"
#include "config/VeloxConfig.h"
#include "utils/ConfigExtractor.h"
@@ -44,6 +46,7 @@ class GlutenIcebergFileNameGenerator : public
connector::hive::FileNameGenerator
std::optional<uint32_t> bucketId,
const std::shared_ptr<const connector::hive::HiveInsertTableHandle>
insertTableHandle,
const connector::ConnectorQueryCtx& connectorQueryCtx,
+ uint32_t maxNumBuckets,
bool commitRequired) const override {
auto targetFileName =
insertTableHandle->locationHandle()->targetFileName();
if (targetFileName.empty()) {
@@ -99,9 +102,9 @@ class GlutenIcebergFileNameGenerator : public
connector::hive::FileNameGenerator
mutable int32_t fileCount_;
};
-iceberg::IcebergNestedField convertToIcebergNestedField(const
gluten::IcebergNestedField& protoField) {
- IcebergNestedField result;
- result.id = protoField.id();
+parquet::ParquetFieldId convertToIcebergNestedField(const
gluten::IcebergNestedField& protoField) {
+ parquet::ParquetFieldId result;
+ result.fieldId = protoField.id();
// Recursively convert children
result.children.reserve(protoField.children_size());
@@ -121,7 +124,7 @@ std::shared_ptr<IcebergInsertTableHandle>
createIcebergInsertTableHandle(
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const IcebergPartitionSpec> spec,
- const iceberg::IcebergNestedField& nestedField,
+ const parquet::ParquetFieldId& nestedField,
facebook::velox::memory::MemoryPool* pool) {
std::vector<std::shared_ptr<const iceberg::IcebergColumnHandle>>
columnHandles;
@@ -139,14 +142,12 @@ std::shared_ptr<IcebergInsertTableHandle>
createIcebergInsertTableHandle(
columnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kPartitionKey,
columnTypes.at(i),
- columnTypes.at(i),
nestedField.children[i]));
} else {
columnHandles.push_back(std::make_shared<iceberg::IcebergColumnHandle>(
columnNames.at(i),
connector::hive::HiveColumnHandle::ColumnType::kRegular,
columnTypes.at(i),
- columnTypes.at(i),
nestedField.children[i]));
}
}
@@ -157,18 +158,10 @@ std::shared_ptr<IcebergInsertTableHandle>
createIcebergInsertTableHandle(
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
std::make_shared<connector::hive::LocationHandle>(
outputDirectoryPath, outputDirectoryPath,
connector::hive::LocationHandle::TableType::kExisting);
- const std::vector<IcebergSortingColumn> sortedBy;
const std::unordered_map<std::string, std::string> serdeParameters;
+ auto writeKind =
connector::hive::iceberg::IcebergInsertTableHandle::WriteKind::kData;
return std::make_shared<connector::hive::iceberg::IcebergInsertTableHandle>(
- columnHandles,
- locationHandle,
- spec,
- pool,
- fileFormat,
- sortedBy,
- compressionKind,
- serdeParameters,
- fileNameGenerator);
+ columnHandles, locationHandle, fileFormat, spec, compressionKind,
serdeParameters, writeKind, fileNameGenerator);
}
} // namespace
@@ -200,20 +193,36 @@ IcebergWriter::IcebergWriter(
connectorSessionProperties_ = createHiveConnectorSessionConfig(veloxCfg);
connectorConfig_ =
std::make_shared<facebook::velox::connector::hive::HiveConfig>(createHiveConnectorConfig(veloxCfg));
+ std::unordered_map<std::string,
std::shared_ptr<facebook::velox::config::ConfigBase>> connectorConfigs;
+ connectorConfigs[kHiveConnectorId] = connectorSessionProperties_;
+ auto queryConfigBase =
+
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
std::string>(sparkConfs));
+ queryCtx_ = facebook::velox::core::QueryCtx::create(
+ nullptr,
+
facebook::velox::core::QueryConfig{facebook::velox::core::QueryConfig::ConfigTag{},
queryConfigBase},
+ connectorConfigs,
+ nullptr, // cache
+ pool_,
+ nullptr, // spillExecutor
+ "IcebergWriter");
+
+ auto expressionEvaluator =
+
std::make_unique<facebook::velox::exec::SimpleExpressionEvaluator>(queryCtx_.get(),
pool_.get());
+
connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
pool_.get(),
connectorPool_.get(),
connectorSessionProperties_.get(),
nullptr,
common::PrefixSortConfig(),
- nullptr,
+ std::move(expressionEvaluator),
nullptr,
"query.IcebergDataSink",
"task.IcebergDataSink",
"planNodeId.IcebergDataSink",
0,
"");
-
+ auto icebergConfig =
std::make_shared<facebook::velox::connector::hive::iceberg::IcebergConfig>(veloxCfg);
dataSink_ = std::make_unique<IcebergDataSink>(
rowType_,
createIcebergInsertTableHandle(
@@ -229,24 +238,29 @@ IcebergWriter::IcebergWriter(
pool_.get()),
connectorQueryCtx_.get(),
facebook::velox::connector::CommitStrategy::kNoCommit,
- connectorConfig_);
+ connectorConfig_,
+ icebergConfig);
}
void IcebergWriter::write(const VeloxColumnarBatch& batch) {
auto inputRowVector = batch.getRowVector();
auto inputRowType = asRowType(inputRowVector->type());
- if (inputRowType->size() != rowType_->size()) {
- const auto& children = inputRowVector->children();
- std::vector<VectorPtr> dataColumns(children.begin() + 1, children.begin()
+ 1 + rowType_->size());
+ const auto& children = inputRowVector->children();
- auto filteredRowVector = std::make_shared<RowVector>(
- pool_.get(), rowType_, inputRowVector->nulls(),
inputRowVector->size(), std::move(dataColumns));
+ std::vector<VectorPtr> dataColumns;
+ dataColumns.reserve(rowType_->size());
- dataSink_->appendData(filteredRowVector);
+ if (inputRowType->size() != rowType_->size()) {
+ dataColumns.insert(dataColumns.end(), children.begin() + 1,
children.begin() + 1 + rowType_->size());
} else {
- dataSink_->appendData(inputRowVector);
+ dataColumns.insert(dataColumns.end(), children.begin(), children.end());
}
+
+ auto rowVector = std::make_shared<RowVector>(
+ pool_.get(), rowType_, inputRowVector->nulls(), inputRowVector->size(),
std::move(dataColumns));
+
+ dataSink_->appendData(rowVector);
}
std::vector<std::string> IcebergWriter::commit() {
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.h
b/cpp/velox/compute/iceberg/IcebergWriter.h
index 2fa13dcd69..0ab3a80360 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.h
+++ b/cpp/velox/compute/iceberg/IcebergWriter.h
@@ -47,7 +47,7 @@ class IcebergWriter {
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const
facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
- const gluten::IcebergNestedField& field,
+ const IcebergNestedField& field,
const std::unordered_map<std::string, std::string>& sparkConfs,
std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool);
@@ -60,7 +60,7 @@ class IcebergWriter {
private:
facebook::velox::RowTypePtr rowType_;
- const facebook::velox::connector::hive::iceberg::IcebergNestedField field_;
+ const facebook::velox::parquet::ParquetFieldId field_;
int32_t partitionId_;
int64_t taskId_;
std::string operationId_;
@@ -69,6 +69,7 @@ class IcebergWriter {
std::shared_ptr<facebook::velox::connector::hive::HiveConfig>
connectorConfig_;
std::shared_ptr<facebook::velox::config::ConfigBase>
connectorSessionProperties_;
+ std::shared_ptr<facebook::velox::core::QueryCtx> queryCtx_;
std::unique_ptr<facebook::velox::connector::ConnectorQueryCtx>
connectorQueryCtx_;
std::unique_ptr<facebook::velox::connector::hive::iceberg::IcebergDataSink>
dataSink_;
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index ad51066f40..f16f48c40d 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -121,6 +121,8 @@ const std::string kVeloxMemReclaimMaxWaitMs =
"spark.gluten.sql.columnar.backend
const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
const std::string kHiveConnectorId = "test-hive";
+const std::string kIcebergConnectorId = "test-iceberg";
+
const std::string kVeloxCacheEnabled =
"spark.gluten.sql.columnar.backend.velox.cacheEnabled";
const std::string kExprMaxCompiledRegexes =
"spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes";
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index a06c948c0f..369d6716ed 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -52,10 +52,6 @@
#include "utils/GpuBufferBatchResizer.h"
#endif
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
-#include "IcebergNestedField.pb.h"
-#endif
-
using namespace gluten;
using namespace facebook;
@@ -844,11 +840,7 @@
Java_org_apache_gluten_vectorized_UnifflePartitionWriterJniWrapper_createPartiti
JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_config_ConfigJniWrapper_isEnhancedFeaturesEnabled( //
NOLINT
JNIEnv* env,
jclass) {
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
return true;
-#else
- return false;
-#endif
}
#ifdef GLUTEN_ENABLE_GPU
@@ -867,7 +859,6 @@ JNIEXPORT jboolean JNICALL
Java_org_apache_gluten_cudf_VeloxCudfPlanValidatorJni
}
#endif
-#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_execution_IcebergWriteJniWrapper_init( // NOLINT
JNIEnv* env,
jobject wrapper,
@@ -955,7 +946,6 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_execution_IcebergWriteJniWrappe
JNI_METHOD_END(nullptr)
}
-#endif
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_HashJoinBuilder_nativeBuild( // NOLINT
JNIEnv* env,
diff --git a/cpp/velox/operators/writer/VeloxColumnarBatchWriter.cc
b/cpp/velox/operators/writer/VeloxColumnarBatchWriter.cc
index b12c0472ac..598199f19b 100644
--- a/cpp/velox/operators/writer/VeloxColumnarBatchWriter.cc
+++ b/cpp/velox/operators/writer/VeloxColumnarBatchWriter.cc
@@ -33,11 +33,15 @@ arrow::Status VeloxColumnarBatchWriter::initWriter(const
facebook::velox::RowTyp
auto localWriteFile =
std::make_unique<facebook::velox::LocalWriteFile>(path_, false, true);
auto sink =
std::make_unique<facebook::velox::dwio::common::WriteFileSink>(std::move(localWriteFile),
path_);
- facebook::velox::parquet::WriterOptions writerOptions;
+ facebook::velox::dwio::common::WriterOptions writerOptions;
+ facebook::velox::parquet::ParquetWriterOptions parquetWriterOptions;
+
writerOptions.memoryPool = pool_.get();
writerOptions.compressionKind =
facebook::velox::common::CompressionKind::CompressionKind_SNAPPY;
- writerOptions.batchSize = batchSize_;
-
+ // set parquet specific options
+ parquetWriterOptions.batchSize = batchSize_;
+ writerOptions.formatSpecificOptions =
+
std::make_shared<facebook::velox::parquet::ParquetWriterOptions>(parquetWriterOptions);
writer_ =
std::make_unique<facebook::velox::parquet::Writer>(std::move(sink),
writerOptions, rowType);
return arrow::Status::OK();
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index 2d7e65734b..5bd499582b 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -21,6 +21,7 @@
#include "VariantToVectorConverter.h"
#include "compute/delta/DeltaConnector.h"
#include "compute/delta/DeltaSplitInfo.h"
+#include "compute/iceberg/IcebergPlanConverter.h"
#include "jni/JniHashTable.h"
#include "operators/hashjoin/HashTableBuilder.h"
#include "operators/plannodes/RowVectorStream.h"
@@ -779,9 +780,7 @@ std::shared_ptr<CudfHiveInsertTableHandle>
makeCudfHiveInsertTableHandle(
for (int i = 0; i < tableColumnNames.size(); ++i) {
columnHandles.push_back(std::make_shared<CudfHiveColumnHandle>(
- tableColumnNames.at(i),
- tableColumnTypes.at(i),
-
cudf::data_type{cudf_velox::veloxToCudfTypeId(tableColumnTypes.at(i))}));
+ tableColumnNames.at(i), tableColumnTypes.at(i),
cudf_velox::veloxToCudfDataType(tableColumnTypes.at(i))));
}
return std::make_shared<CudfHiveInsertTableHandle>(
@@ -878,7 +877,8 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
GLUTEN_CHECK(formatShortName == "parquet", "Unsupported file write format: "
+ formatShortName);
dwio::common::FileFormat fileFormat = dwio::common::FileFormat::PARQUET;
- const std::shared_ptr<facebook::velox::parquet::WriterOptions> writerOptions
= makeParquetWriteOption(writeConfs);
+ const std::shared_ptr<facebook::velox::dwio::common::WriterOptions>
writerOptions =
+ makeParquetWriteOption(writeConfs);
// Spark's default compression code is snappy.
const auto& compressionKind =
writerOptions->compressionKind.value_or(common::CompressionKind::CompressionKind_SNAPPY);
@@ -1584,7 +1584,10 @@ core::PlanNodePtr
SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ?
exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
auto connectorId = isDeltaSplitInfo(splitInfo) ? connectorIds_.delta :
connectorIds_.hive;
- if (connectorId == connectorIds_.hive && useCudfTableHandle(splitInfos_) &&
+ if (std::dynamic_pointer_cast<IcebergSplitInfo>(splitInfo)) {
+ connectorId = connectorIds_.iceberg;
+ }
+ if ((connectorId == connectorIds_.hive || connectorId ==
connectorIds_.iceberg) && useCudfTableHandle(splitInfos_) &&
veloxCfg_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault)
&&
veloxCfg_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
#ifdef GLUTEN_ENABLE_GPU
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
index ca9f6c4012..e9c4f9879e 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.h
@@ -41,7 +41,11 @@ class SubstraitToVeloxPlanValidator {
pool,
veloxCfg_.get(),
std::vector<std::shared_ptr<ResultIterator>>{},
- VeloxConnectorIds{.hive = kHiveConnectorId, .iterator =
kIteratorConnectorId, .cudfHive = kCudfHiveConnectorId},
+ VeloxConnectorIds{
+ .hive = kHiveConnectorId,
+ .iceberg = kIcebergConnectorId,
+ .iterator = kIteratorConnectorId,
+ .cudfHive = kCudfHiveConnectorId},
std::nullopt,
std::nullopt,
true);
diff --git a/cpp/velox/utils/ConfigExtractor.cc
b/cpp/velox/utils/ConfigExtractor.cc
index bc226541a8..f398c2588d 100644
--- a/cpp/velox/utils/ConfigExtractor.cc
+++ b/cpp/velox/utils/ConfigExtractor.cc
@@ -25,7 +25,8 @@
#include "utils/Macros.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/storage_adapters/s3fs/S3Config.h"
-#include "velox/dwio/parquet/writer/WriterConfig.h"
+#include "velox/dwio/common/Options.h"
+#include "velox/dwio/parquet/common/ParquetConfig.h"
namespace gluten {
@@ -219,6 +220,11 @@ void getAbfsHiveConfig(
#endif
}
+std::string parquetSessionProperty(std::string_view key) {
+ return
facebook::velox::dwio::common::formatConfigPrefix(facebook::velox::dwio::common::FileFormat::PARQUET,
"_") +
+ std::string(key);
+}
+
} // namespace
std::shared_ptr<facebook::velox::config::ConfigBase>
createHiveConnectorSessionConfig(
@@ -229,7 +235,8 @@ std::shared_ptr<facebook::velox::config::ConfigBase>
createHiveConnectorSessionC
configs[facebook::velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCaseSession]
=
!conf->get<bool>(kCaseSensitive, false) ? "true" : "false";
configs[facebook::velox::connector::hive::HiveConfig::kPartitionPathAsLowerCaseSession]
= "false";
-
configs[facebook::velox::parquet::WriterConfig::kParquetSessionWriteTimestampUnit]
= std::string("6");
+
configs[parquetSessionProperty(facebook::velox::parquet::ParquetConfig::kWriterTimestampUnitSession)]
=
+ std::string("6");
configs[facebook::velox::connector::hive::HiveConfig::kReadTimestampUnitSession]
= std::string("6");
configs[facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession]
=
conf->get<std::string>(kMaxPartitions, "10000");
@@ -243,8 +250,10 @@ std::shared_ptr<facebook::velox::config::ConfigBase>
createHiveConnectorSessionC
conf->get<bool>(kAllowInt32Narrowing, true) ? "true" : "false";
configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession]
=
conf->get<bool>(kOrcUseColumnNames, true) ? "true" : "false";
-
configs[facebook::velox::parquet::WriterConfig::kParquetSessionWritePageSize] =
+
configs[parquetSessionProperty(facebook::velox::parquet::ParquetConfig::kWriterPageSizeSession)]
=
conf->get<std::string>(kWriteParquetPageSizeBytes, "1MB");
+
configs[parquetSessionProperty(facebook::velox::parquet::ParquetConfig::kNullStructIfAllFieldsMissingSession)]
=
+ "true";
overwriteVeloxConf(conf.get(), configs, kDynamicBackendConfPrefix);
return
std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
@@ -307,8 +316,6 @@ std::shared_ptr<facebook::velox::config::ConfigBase>
createHiveConnectorConfig(
// read as UTC
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kReadTimestampPartitionValueAsLocalTime]
= "false";
-
hiveConfMap[facebook::velox::connector::hive::HiveConfig::kParquetNullStructForMissingFields]
= "true";
-
overwriteVeloxConf(conf.get(), hiveConfMap, kStaticBackendConfPrefix);
return
std::make_shared<facebook::velox::config::ConfigBase>(std::move(hiveConfMap));
}
diff --git a/cpp/velox/utils/GpuBufferBatchResizer.cc
b/cpp/velox/utils/GpuBufferBatchResizer.cc
index e896442dde..fc4ed60cba 100644
--- a/cpp/velox/utils/GpuBufferBatchResizer.cc
+++ b/cpp/velox/utils/GpuBufferBatchResizer.cc
@@ -152,7 +152,7 @@ std::shared_ptr<VeloxColumnarBatch> makeCudfTable(
DispatchColumn dispatch{stream, cudf::get_current_device_resource_ref(),
buffers, numRows};
for (const auto& colType : type->children()) {
auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
- dispatch.readFlatColumn, colType->kind(),
cudf_velox::veloxToCudfTypeId(colType));
+ dispatch.readFlatColumn, colType->kind(),
cudf_velox::veloxToCudfDataType(colType).id());
cudfColumns.emplace_back(std::move(res));
}
auto cudfTable = std::make_unique<cudf::table>(std::move(cudfColumns));
diff --git a/cpp/velox/utils/VeloxWriterUtils.cc
b/cpp/velox/utils/VeloxWriterUtils.cc
index 5616cd9481..0196aa5757 100644
--- a/cpp/velox/utils/VeloxWriterUtils.cc
+++ b/cpp/velox/utils/VeloxWriterUtils.cc
@@ -38,7 +38,8 @@ const int32_t kGzipWindowBits4k = 12;
const int32_t kZSTDDefaultCompressionLevel = 3;
} // namespace
-std::unique_ptr<WriterOptions> makeParquetWriteOption(const
std::unordered_map<std::string, std::string>& sparkConfs) {
+std::shared_ptr<facebook::velox::dwio::common::WriterOptions>
makeParquetWriteOption(
+ const std::unordered_map<std::string, std::string>& sparkConfs) {
int64_t maxRowGroupBytes = 134217728; // 128MB
int64_t maxRowGroupRows = 100000000; // 100M
if (auto it = sparkConfs.find(kParquetBlockSize); it != sparkConfs.end()) {
@@ -47,8 +48,10 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const
std::unordered_map<s
if (auto it = sparkConfs.find(kParquetBlockRows); it != sparkConfs.end()) {
maxRowGroupRows = std::stoll(it->second);
}
- auto writeOption = std::make_unique<WriterOptions>();
- writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds
/*micro*/;
+
+ auto writeOption =
std::make_shared<facebook::velox::dwio::common::WriterOptions>();
+ auto parquetOptions = std::make_shared<ParquetWriterOptions>();
+ parquetOptions->parquetWriteTimestampUnit =
TimestampPrecision::kMicroseconds /*micro*/;
bool writeLegacyParquetFormat = false;
if (auto it = sparkConfs.find(kParquetStoreDecimalAsInteger); it !=
sparkConfs.end()) {
writeLegacyParquetFormat = boost::iequals(it->second, "true");
@@ -62,7 +65,7 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const
std::unordered_map<s
// TODO: Only DECIMAL is handled here. Spark's writeLegacyFormat also
changes ARRAY (bag/array
// vs list/element) and MAP (map vs key_value) physical layouts, which are
not yet supported
// in Gluten native Parquet write.
- writeOption->enableStoreDecimalAsInteger = !writeLegacyParquetFormat;
+ parquetOptions->enableStoreDecimalAsInteger = !writeLegacyParquetFormat;
auto compressionCodec = CompressionKind::CompressionKind_SNAPPY;
if (auto it = sparkConfs.find(kParquetCompressionCodec); it !=
sparkConfs.end()) {
auto compressionCodecStr = it->second;
@@ -76,7 +79,7 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const
std::unordered_map<s
if (parquetGzipWindowSizeStr == kGzipWindowSize4k) {
auto codecOptions =
std::make_shared<parquet::arrow::util::GZipCodecOptions>();
codecOptions->windowBits = kGzipWindowBits4k;
- writeOption->codecOptions = std::move(codecOptions);
+ parquetOptions->codecOptions = std::move(codecOptions);
}
}
} else if (boost::iequals(compressionCodecStr, "lzo")) {
@@ -92,7 +95,7 @@ std::unique_ptr<WriterOptions> makeParquetWriteOption(const
std::unordered_map<s
auto it = sparkConfs.find(kParquetZSTDCompressionLevel);
auto compressionLevel = it != sparkConfs.end() ? std::stoi(it->second) :
kZSTDDefaultCompressionLevel;
codecOptions->compressionLevel = compressionLevel;
- writeOption->codecOptions = std::move(codecOptions);
+ parquetOptions->codecOptions = std::move(codecOptions);
} else if (boost::iequals(compressionCodecStr, "uncompressed")) {
compressionCodec = CompressionKind::CompressionKind_NONE;
} else if (boost::iequals(compressionCodecStr, "none")) {
@@ -104,28 +107,29 @@ std::unique_ptr<WriterOptions>
makeParquetWriteOption(const std::unordered_map<s
return std::make_unique<LambdaFlushPolicy>(maxRowGroupRows,
maxRowGroupBytes, [&]() { return false; });
};
if (auto it = sparkConfs.find(kSessionTimezone); it != sparkConfs.end()) {
- writeOption->parquetWriteTimestampTimeZone =
normalizeSessionTimezone(it->second);
+ parquetOptions->parquetWriteTimestampTimeZone =
normalizeSessionTimezone(it->second);
}
- writeOption->arrowMemoryPool =
+ parquetOptions->arrowMemoryPool =
getDefaultMemoryManager()->getOrCreateArrowMemoryPool("VeloxParquetWrite.ArrowMemoryPool");
if (auto it = sparkConfs.find(kParquetDataPageSize); it != sparkConfs.end())
{
auto dataPageSize = std::stoll(it->second);
- writeOption->dataPageSize = dataPageSize;
+ parquetOptions->dataPageSize = dataPageSize;
}
if (auto it = sparkConfs.find(kParquetWriterVersion); it !=
sparkConfs.end()) {
auto parquetVersion = it->second;
if (boost::iequals(parquetVersion, "v2")) {
- writeOption->useParquetDataPageV2 = true;
+ parquetOptions->useParquetDataPageV2 = true;
}
}
if (auto it = sparkConfs.find(kParquetEnableDictionary); it !=
sparkConfs.end()) {
auto enableDictionary = it->second;
if (boost::iequals(enableDictionary, "true")) {
- writeOption->enableDictionary = true;
+ parquetOptions->enableDictionary = true;
} else {
- writeOption->enableDictionary = false;
+ parquetOptions->enableDictionary = false;
}
}
+ writeOption->formatSpecificOptions = std::move(parquetOptions);
return writeOption;
}
diff --git a/cpp/velox/utils/VeloxWriterUtils.h
b/cpp/velox/utils/VeloxWriterUtils.h
index 65fe2b2c62..56bd5b2e27 100644
--- a/cpp/velox/utils/VeloxWriterUtils.h
+++ b/cpp/velox/utils/VeloxWriterUtils.h
@@ -23,7 +23,7 @@
namespace gluten {
-std::unique_ptr<facebook::velox::parquet::WriterOptions>
makeParquetWriteOption(
+std::shared_ptr<facebook::velox::dwio::common::WriterOptions>
makeParquetWriteOption(
const std::unordered_map<std::string, std::string>& sparkConfs);
} // namespace gluten
diff --git a/dev/ci-velox-buildshared-centos-9.sh
b/dev/ci-velox-buildshared-centos-9.sh
index 846765df7e..2d2fa8c9bc 100755
--- a/dev/ci-velox-buildshared-centos-9.sh
+++ b/dev/ci-velox-buildshared-centos-9.sh
@@ -27,5 +27,5 @@ fi
export VELOX_BUILD_SHARED=ON
-./dev/builddeps-veloxbe.sh --run_setup_script=ON --build_arrow=ON
--build_tests=ON \
+./dev/builddeps-veloxbe.sh --run_setup_script=OFF --build_arrow=OFF
--build_tests=ON \
--build_examples=ON --build_benchmarks=ON
diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh
index 637860db37..81112e8fe2 100755
--- a/ep/build-velox/src/get-velox.sh
+++ b/ep/build-velox/src/get-velox.sh
@@ -18,8 +18,8 @@ set -exu
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_12
-VELOX_ENHANCED_BRANCH=ibm-2026_06_12
+VELOX_BRANCH=dft-2026_06_27
+VELOX_ENHANCED_BRANCH=ibm-2026_06_27
VELOX_HOME=""
RUN_SETUP_SCRIPT=ON
ENABLE_ENHANCED_FEATURES=OFF
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index 3f7ab278d0..83ecb07037 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -34,9 +34,7 @@ object IcebergWriteUtil {
}
private lazy val writePropertiesField = {
- val field = classOf[SparkWrite].getDeclaredField("writeProperties")
- field.setAccessible(true)
- field
+ optionalField(classOf[SparkWrite], "writeProperties")
}
private lazy val writeConfField = {
@@ -89,7 +87,9 @@ object IcebergWriteUtil {
}
def getWriteProperty(write: Write): java.util.Map[String, String] = {
- writePropertiesField.get(write).asInstanceOf[java.util.Map[String, String]]
+ writePropertiesField
+ .map(_.get(write).asInstanceOf[java.util.Map[String, String]])
+ .getOrElse(java.util.Collections.emptyMap[String, String]())
}
def getWriteConf(write: Write): SparkWriteConf = {
@@ -128,4 +128,12 @@ object IcebergWriteUtil {
commit
}
+ private def optionalField(cls: Class[_], name: String):
Option[java.lang.reflect.Field] =
+ try {
+ val f = cls.getDeclaredField(name)
+ f.setAccessible(true)
+ Some(f)
+ } catch {
+ case _: NoSuchFieldException => None
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]